Skip to content

Commit

Permalink
VIP - change event starter function
Browse files Browse the repository at this point in the history
  • Loading branch information
shahryarjb committed Jun 5, 2024
1 parent 3dee215 commit c324d9a
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 49 deletions.
12 changes: 12 additions & 0 deletions lib/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,18 @@ defmodule MishkaInstaller.Application do

@impl true
def start(_type, _args) do
if Mix.env() in [:test, :dev] and is_nil(Application.get_env(:mishka, Mishka.MnesiaRepo)) do
mnesia_dir = ".mnesia" <> "/#{Mix.env()}"

Application.put_env(:mishka, Mishka.MnesiaRepo,
mnesia_dir: mnesia_dir,
essential: [
MishkaInstaller.ProcessingPipelines.Queue.Queue,
MishkaInstaller.PluginsManagement.Event
]
)
end

children = [
{Registry, keys: :unique, name: MishkaJobWorkerRegistry},
{Phoenix.PubSub, name: MishkaInstaller.PubSub},
Expand Down
1 change: 1 addition & 0 deletions lib/mishka_installer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ defmodule MishkaInstaller do
MishkaInstaller.PubSub
|> Phoenix.PubSub.broadcast("mishka:plugin:#{channel}", %{
status: status,
channel: channel,
data: data
})
else
Expand Down
19 changes: 17 additions & 2 deletions lib/mnesia_repo.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ defmodule MishkaInstaller.MnesiaRepo do
########################## (▰˘◡˘▰) Schema (▰˘◡˘▰) ############################
####################################################################################
defmodule State do
defstruct tables: [], schemas: []
defstruct tables: [], schemas: [], status: :started
end

################################################################################
Expand Down Expand Up @@ -65,13 +65,15 @@ defmodule MishkaInstaller.MnesiaRepo do

Logger.info("Identifier: #{inspect(@identifier)} ::: Mnesia tables Synchronized...")

essential_tables(Keyword.get(config, :essential))

MishkaInstaller.broadcast("mnesia", :started, %{
identifier: :mishka_mnesia_repo,
local_tables: :mnesia.system_info(:local_tables),
schemas: schemas()
})

essential_tables(Keyword.get(config, :essential))
Process.send_after(__MODULE__, :health_check, 1000)

{:ok, %State{tables: :mnesia.system_info(:local_tables), schemas: schemas()}}
else
Expand Down Expand Up @@ -106,6 +108,19 @@ defmodule MishkaInstaller.MnesiaRepo do
end

@impl true
def handle_info(:health_check, state) do
# TODO: this function can have some checker
Process.send_after(__MODULE__, :health_check, 1000)

MishkaInstaller.broadcast("mnesia", Map.get(state, :status), %{
identifier: :mishka_mnesia_repo,
local_tables: :mnesia.system_info(:local_tables),
schemas: schemas()
})

{:noreply, state}
end

def handle_info(info, state) do
Logger.warning("Identifier: #{inspect(@identifier)} ::: Unexpected info: #{inspect(info)}")
{:noreply, state}
Expand Down
95 changes: 48 additions & 47 deletions lib/plugins_management/event.ex
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
defmodule MishkaInstaller.PluginsManagement.Event do
use GuardedStruct
use GenServer
require Logger
alias MishkaDeveloperTools.Helper.{Extra, UUID}
alias MishkaInstaller.ProcessingPipelines.Queue.Queue
alias MnesiaAssistant.{Transaction, Query, Table}
alias MnesiaAssistant.Error, as: MError
alias MishkaInstaller.Helper.PluginModuleStateEvent
import MnesiaAssistant, only: [er: 1, erl_fields: 4]
alias MishkaInstaller.PluginsManagement.PluginWorker

# For testing
if Mix.env() in [:dev, :test] do
Code.ensure_loaded(MishkaInstaller.PluginsManagement.PluginWorker)
Code.ensure_loaded(PluginWorker)
end

####################################################################################
Expand Down Expand Up @@ -49,18 +51,15 @@ defmodule MishkaInstaller.PluginsManagement.Event do
################################################################################
######################## (▰˘◡˘▰) Init data (▰˘◡˘▰) #######################
################################################################################
@identifier :mishka_installer_event
@max_try 3
@wait_for_tables :timer.seconds(15)

@doc false
def start_link(_state) do
GenServer.start_link(__MODULE__, %{ref: nil, db_try: 0}, name: __MODULE__)
GenServer.start_link(__MODULE__, %{status: :registered}, name: __MODULE__)
end

@impl true
def init(state \\ %{}) do
{:ok, state, {:continue, :start_mnesia}}
:ok = MishkaInstaller.subscribe("mnesia")
{:ok, state}
end

def database_config() do
Expand All @@ -84,38 +83,53 @@ defmodule MishkaInstaller.PluginsManagement.Event do
####################################################################################
########################## (▰˘◡˘▰) Callback (▰˘◡˘▰) ##########################
####################################################################################
@impl true
def handle_continue(:start_mnesia, state) do
{:noreply, %{state | ref: mnesia_task_helper().ref, db_try: 1}}
end

def handle_continue(:start_service_restoring, state) do
if start() == :ok do
:persistent_term.put(:event_status, "ready")
end

{:noreply, state}
end

@impl true
# The task completed successfully
def handle_info({ref, _answer}, %{ref: ref} = state) do
# We don't care about the DOWN message now, so let's demonitor and flush it
Process.demonitor(ref, [:flush])
def handle_info(
%{
data: %{identifier: :mishka_mnesia_repo, local_tables: local_tables, schemas: _},
status: :started,
channel: "mnesia"
},
state
) do
new_state =
if __MODULE__ in local_tables and state.status == :registered do
if PluginWorker not in local_tables do
Logger.warning("Identifier: #{inspect(__MODULE__)} ::: Waiting for queue table")

MnesiaAssistant.Table.wait_for_tables([Queue], :infinity)

Logger.info("Identifier: #{inspect(PluginWorker)} ::: Mnesia tables Synchronized...")
end

Queue.new(%{
worker: MishkaInstaller.PluginsManagement.PluginWorker,
error: %{type: :continuously, max_try: 5, delay: 0}
})
if is_nil(Queue.read(worker: PluginWorker)) do
Queue.new(%{
worker: PluginWorker,
error: %{type: :continuously, max_try: 5, delay: 0}
})
|> case do
{:ok, _data} ->
if start() == :ok, do: :persistent_term.put(:event_status, "ready")
Map.merge(state, %{status: :started})

{:error, errors} ->
Logger.critical("Identifier: #{inspect(__MODULE__)} ::: Source: #{inspect(errors)}")
Map.merge(state, %{status: :stopped})
end
else
if start() == :ok, do: :persistent_term.put(:event_status, "ready")
Map.merge(state, %{status: :started})
end
else
state
end

# Do something with the result and then return
{:noreply, %{state | ref: nil}, {:continue, :start_service_restoring}}
{:noreply, new_state}
end

# The task failed
def handle_info({:DOWN, ref, :process, _pid, _reason}, %{ref: ref} = state) do
# Log and possibly restart the task...
{:noreply, %{state | ref: nil}}
def handle_info(_msg, state) do
{:noreply, state}
end

####################################################################################
Expand Down Expand Up @@ -149,7 +163,7 @@ defmodule MishkaInstaller.PluginsManagement.Event do
1
)
]
|> Queue.put_r(worker: MishkaInstaller.PluginsManagement.PluginWorker)
|> Queue.put_r(worker: PluginWorker)
end

{:ok, :start, db_plg}
Expand Down Expand Up @@ -583,19 +597,6 @@ defmodule MishkaInstaller.PluginsManagement.Event do

defp plugin_status(_status), do: :ok

defp mnesia_task_helper() do
Task.Supervisor.async_nolink(
{:via, PartitionSupervisor, {MishkaInstaller.MnesiaTaskSupervisors, self()}},
fn ->
MnesiaAssistant.start("#{Mix.env()}", ".mnesia", @identifier)

_ =
{0, [Queue], @wait_for_tables, @max_try}
|> Table.start_table(__MODULE__, database_config(), @identifier)
end
)
end

defp delete_update_mnesia(map, args) do
values_tuple =
([__MODULE__] ++ Enum.map(keys(), &Map.get(map, &1)))
Expand Down

0 comments on commit c324d9a

Please sign in to comment.