Skip to content

Commit

Permalink
Add entries module of mnesia_repo, starter and etc
Browse files Browse the repository at this point in the history
  • Loading branch information
shahryarjb committed Jun 5, 2024
1 parent dd1ee85 commit 3dee215
Show file tree
Hide file tree
Showing 9 changed files with 335 additions and 33 deletions.
113 changes: 113 additions & 0 deletions .iex.exs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
Code.ensure_loaded(MishkaInstaller.PluginsManagement.PluginWorker)
Code.ensure_loaded(Example.WoerkerTest)
Code.ensure_loaded(Example.WoerkerTest1)

alias MishkaInstaller.PluginsManagement.Event
alias MishkaInstaller.ProcessingPipelines.Queue.{Queue, Job}
Expand Down Expand Up @@ -49,21 +51,132 @@ put_r_full = fn ->
MnesiaAssistant.Transaction.transaction fn ->
[elem(Queue.builder(%{timeout: 100_000, entries: [event: "test_a_r1", type: :start]}), 1)]
|> Queue.put_r(worker: MishkaInstaller.PluginsManagement.PluginWorker)
|> case do
{:error, errors} -> errors |> IO.inspect("PluginWorker")
_ -> nil
end



[elem(Queue.builder(%{timeout: 100_000, entries: [event: "test_a_r2", type: :start]}), 1)]
|> Queue.put_r(worker: MishkaInstaller.PluginsManagement.PluginWorker)
|> case do
{:error, errors} -> errors |> IO.inspect("PluginWorker")
_ -> nil
end

[elem(Queue.builder(%{timeout: 100_000, entries: [event: "test_a_r3", type: :start]}), 1)]
|> Queue.put_r(worker: MishkaInstaller.PluginsManagement.PluginWorker)
|> case do
{:error, errors} -> errors |> IO.inspect("PluginWorker")
_ -> nil
end

[elem(Queue.builder(%{timeout: 100_000, entries: [event: "test_a_r4", type: :start]}), 1)]
|> Queue.put_r(worker: MishkaInstaller.PluginsManagement.PluginWorker)
|> case do
{:error, errors} -> errors |> IO.inspect("PluginWorker")
_ -> nil
end

[elem(Queue.builder(%{timeout: 100_000, entries: [event: "test_a_r5", type: :start]}), 1)]
|> Queue.put_r(worker: MishkaInstaller.PluginsManagement.PluginWorker)
|> case do
{:error, errors} -> errors |> IO.inspect("PluginWorker")
_ -> nil
end

[elem(Queue.builder(%{timeout: 100_000, entries: [event: "test_a_r6", type: :start]}), 1)]
|> Queue.put_r(worker: MishkaInstaller.PluginsManagement.PluginWorker)
|> case do
{:error, errors} -> errors |> IO.inspect("PluginWorker")
_ -> nil
end



# **********************************************
Queue.new(%{worker: Example.WoerkerTest})

[elem(Queue.builder(%{timeout: 100_000, entries: [event: "test_a_r1", type: :start]}), 1)]
|> Queue.put_r(worker: Example.WoerkerTest)
|> case do
{:error, errors} -> errors |> IO.inspect("WoerkerTest")
_ -> nil
end

[elem(Queue.builder(%{timeout: 100_000, entries: [event: "test_b_r2", type: :start]}), 1)]
|> Queue.put_r(worker: Example.WoerkerTest)
|> case do
{:error, errors} -> errors |> IO.inspect("WoerkerTest")
_ -> nil
end

[elem(Queue.builder(%{timeout: 100_000, entries: [event: "test_b_r3", type: :start]}), 1)]
|> Queue.put_r(worker: Example.WoerkerTest)
|> case do
{:error, errors} -> errors |> IO.inspect("WoerkerTest")
_ -> nil
end

[elem(Queue.builder(%{timeout: 100_000, entries: [event: "test_b_r4", type: :start]}), 1)]
|> Queue.put_r(worker: Example.WoerkerTest)
|> case do
{:error, errors} -> errors |> IO.inspect("WoerkerTest")
_ -> nil
end

[elem(Queue.builder(%{timeout: 100_000, entries: [event: "test_b_r5", type: :start]}), 1)]
|> Queue.put_r(worker: Example.WoerkerTest)
|> case do
{:error, errors} -> errors |> IO.inspect("WoerkerTest")
_ -> nil
end

[elem(Queue.builder(%{timeout: 100_000, entries: [event: "test_b_r6", type: :start]}), 1)]
|> Queue.put_r(worker: Example.WoerkerTest)
|> case do
{:error, errors} -> errors |> IO.inspect("WoerkerTest")
_ -> nil
end

# **********************************************
Queue.new(%{worker: Example.WoerkerTest1})
[elem(Queue.builder(%{timeout: 100_000, entries: [event: "test_WoerkerTest1_r2", type: :start]}), 1)]
|> Queue.put_r(worker: Example.WoerkerTest1)
|> case do
{:error, errors} -> errors |> IO.inspect("WoerkerTest1")
_ -> nil
end

[elem(Queue.builder(%{timeout: 100_000, entries: [event: "test_WoerkerTest1_r3", type: :start]}), 1)]
|> Queue.put_r(worker: Example.WoerkerTest1)
|> case do
{:error, errors} -> errors |> IO.inspect("WoerkerTest1")
_ -> nil
end

[elem(Queue.builder(%{timeout: 100_000, entries: [event: "test_WoerkerTest1_r4", type: :start]}), 1)]
|> Queue.put_r(worker: Example.WoerkerTest1)
|> case do
{:error, errors} -> errors |> IO.inspect("WoerkerTest1")
_ -> nil
end

[elem(Queue.builder(%{timeout: 100_000, entries: [event: "test_WoerkerTest1_r5", type: :start]}), 1)]
|> Queue.put_r(worker: Example.WoerkerTest1)
|> case do
{:error, errors} -> errors |> IO.inspect("WoerkerTest1")
_ -> nil
end

[elem(Queue.builder(%{timeout: 100_000, entries: [event: "test_WoerkerTest1_r6", type: :start]}), 1)]
|> Queue.put_r(worker: Example.WoerkerTest1)
|> case do
{:error, errors} -> errors |> IO.inspect("WoerkerTest1")
_ -> nil
end

:ok
end
end
Expand Down
1 change: 1 addition & 0 deletions lib/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ defmodule MishkaInstaller.Application do
child_spec: Task.Supervisor, name: MishkaInstaller.MnesiaTaskSupervisors},
{PartitionSupervisor,
child_spec: Task.Supervisor, name: MishkaInstaller.JobTaskSupervisors},
MishkaInstaller.MnesiaRepo,
MishkaInstaller.ProcessingPipelines.Queue.Queue,
MishkaInstaller.PluginsManagement.Event
]
Expand Down
186 changes: 186 additions & 0 deletions lib/mnesia_repo.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
defmodule MishkaInstaller.MnesiaRepo do
use GenServer
require Logger
alias MnesiaAssistant.Error, as: MError

@identifier :mishka_mnesia_repo
####################################################################################
########################## (▰˘◡˘▰) Schema (▰˘◡˘▰) ############################
####################################################################################
defmodule State do
defstruct tables: [], schemas: []
end

################################################################################
######################## (▰˘◡˘▰) Init data (▰˘◡˘▰) #######################
################################################################################
def start_link(state \\ []) do
GenServer.start_link(__MODULE__, state, name: __MODULE__)
end

@impl true
def init(_state \\ []) do
Logger.info(
"Identifier: #{inspect(@identifier)} ::: MnesiaMessage: Mnesia's initial valuation process has begun."
)

Logger.info(
"Identifier: #{inspect(@identifier)} ::: Stopping Mnesia to start and load Schema files."
)

MnesiaAssistant.stop() |> MError.error_description()

Logger.info(
"Identifier: #{inspect(@identifier)} ::: Creating the parent path of Mnesia if it doesn't exist."
)

Logger.info(
"Identifier: #{inspect(@identifier)} ::: Placing Mnesia address of the generator in the mentioned system variable."
)

mnesia_dir = ".mnesia" <> "/#{Mix.env()}"
config = Application.get_env(:mishka, Mishka.MnesiaRepo, mnesia_dir: mnesia_dir)
File.mkdir_p(config[:mnesia_dir]) |> MError.error_description(@identifier)
Application.put_env(:mnesia, :dir, config[:mnesia_dir] |> to_charlist)

my_node = node()
db_nodes = :mnesia.system_info(:db_nodes)

if my_node in db_nodes do
case MnesiaAssistant.Information.system_info(:extra_db_nodes) do
[] ->
MnesiaAssistant.Schema.create_schema([my_node]) |> MError.error_description(@identifier)

_ ->
:ok
end

MnesiaAssistant.start() |> MError.error_description(@identifier)

Logger.debug(
"Identifier: #{inspect(@identifier)} ::: Waiting for Mnesia tables synchronization..."
)

MnesiaAssistant.Table.wait_for_tables(:mnesia.system_info(:local_tables), :infinity)

Logger.info("Identifier: #{inspect(@identifier)} ::: Mnesia tables Synchronized...")

MishkaInstaller.broadcast("mnesia", :started, %{
identifier: :mishka_mnesia_repo,
local_tables: :mnesia.system_info(:local_tables),
schemas: schemas()
})

essential_tables(Keyword.get(config, :essential))

{:ok, %State{tables: :mnesia.system_info(:local_tables), schemas: schemas()}}
else
Logger.critical(
"Identifier: #{inspect(@identifier)} ::: Node name mismatch: I'm #{my_node}, the database is owned by #{db_nodes}"
)

{:stop, :node_name_mismatch}
end
end

####################################################################################
########################## (▰˘◡˘▰) Callback (▰˘◡˘▰) ##########################
####################################################################################
@impl true
def handle_call(:state, _from, state) do
{:reply, state, state}
end

def handle_call(request, from, state) do
Logger.warning(
"Identifier: #{inspect(@identifier)} ::: Unexpected call from #{inspect(from)}: #{inspect(request)}"
)

{:noreply, state}
end

@impl true
def handle_cast(msg, state) do
Logger.warning("Identifier: #{inspect(@identifier)} ::: Unexpected cast: #{inspect(msg)}")
{:noreply, state}
end

@impl true
def handle_info(info, state) do
Logger.warning("Identifier: #{inspect(@identifier)} ::: Unexpected info: #{inspect(info)}")
{:noreply, state}
end

@impl true
def terminate(_reason, _state) do
:ok
end

@impl true
def code_change(_old_vsn, state, _extra) do
{:ok, state}
end

####################################################################################
########################## (▰˘◡˘▰) Helper (▰˘◡˘▰) ############################
####################################################################################
def schema_path() do
dir =
case System.get_env("Mishka_MNESIA_SCHEMA") do
nil -> :mnesia.system_info(:directory)
path -> path
end

Path.join(dir, "mishka.schema")
end

def schemas() do
Enum.flat_map(:mnesia.system_info(:tables), fn
:schema ->
[]

tab ->
[
{tab,
[
{:storage_type, :mnesia.table_info(tab, :storage_type)},
{:local_content, :mnesia.table_info(tab, :local_content)}
]}
]
end)
end

defp essential_tables(nil), do: nil

defp essential_tables(essentials) do
essentials
|> Enum.map(fn item ->
with true <- item not in :mnesia.system_info(:local_tables),
true <- Code.ensure_loaded?(item),
function_exported?(item, :database_config, 0) do
MnesiaAssistant.Table.create_table(item, item.database_config())
|> MError.error_description(item)
|> case do
{:ok, :atomic} ->
Logger.info("Identifier: #{inspect(item)} ::: Mnesia table Synchronized...")

Logger.info(
"Identifier: #{inspect(item)} ::: Mnesia essentials table synchronization..."
)

MnesiaAssistant.Table.wait_for_tables([item], :infinity)

Logger.info(
"Identifier: #{inspect(item)} ::: Mnesia essentials table Synchronized..."
)

{:error, {:aborted, {:already_exists, module}}, _msg} ->
Logger.info("Identifier: #{inspect(module)} ::: Mnesia tables already exists...")

error ->
Logger.error("Identifier: #{inspect(item)} ::: Source: #{inspect(error)}")
end
end
end)
end
end
2 changes: 1 addition & 1 deletion lib/plugins_management/event.ex
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ defmodule MishkaInstaller.PluginsManagement.Event do
{:ok, state, {:continue, :start_mnesia}}
end

defp database_config() do
def database_config() do
config = [
type: :set,
disc_copies: [node()],
Expand Down
Loading

0 comments on commit 3dee215

Please sign in to comment.