Skip to content

Commit

Permalink
VIP
Browse files Browse the repository at this point in the history
  • Loading branch information
shahryarjb committed Jun 3, 2024
1 parent 4b2a07f commit dd1ee85
Show file tree
Hide file tree
Showing 6 changed files with 251 additions and 163 deletions.
119 changes: 83 additions & 36 deletions .iex.exs
Original file line number Diff line number Diff line change
@@ -1,42 +1,89 @@
Code.ensure_loaded(MishkaInstaller.PluginsManagement.PluginWorker)

alias MishkaInstaller.PluginsManagement.Event
alias MishkaInstaller.ProcessingPipelines.Queue.{Queue, Job}

# These function for developer mod to force macro
Example.Subscribe.start_link()
Example.WoerkerTest.worker?()
Example.WoerkerTest1.worker?()

queued = [
%{entries: [role: "admin::Example.WoerkerTest"]},
%{entries: [role: "user::Example.WoerkerTest"]},
%{entries: [role: "blocked::Example.WoerkerTest"], timeout: 1}
]

queued1 = [
%{entries: [role: "admin::Example.WoerkerTest1,"]},
%{entries: [role: "user::Example.WoerkerTest1,"]},
%{entries: [role: "blocked::Example.WoerkerTest1,"]},
%{entries: [role: "bot::Example.WoerkerTest1,"]}
]


create_workers =
fn ->
Queue.create(%{worker: Example.WoerkerTest, queued: queued})
Queue.create(%{worker: Example.WoerkerTest1, queued: queued1})
end
bulk_start = fn ->
Example.Bulk.EventBEmail.start_link()
Example.Bulk.EventCEmail.start_link()
Example.Bulk.EventCOTP.start_link()
Example.Bulk.EventCSocial.start_link()
Example.Bulk.EventAEmail.start_link()
Example.Bulk.EventAOTP.start_link()
end

add_new_queues =
fn ->
[
elem(Queue.builder(%{entries: [role: "user1::new_queue::Example.WoerkerTest"]}), 1),
elem(Queue.builder(%{entries: [role: "user2::new_queue::Example.WoerkerTest"]}), 1),
elem(Queue.builder(%{entries: [role: "user3::new_queue::Example.WoerkerTest"]}), 1)
]
|> Queue.put_r(worker: Example.WoerkerTest)
end
bulk_start2 = fn ->
Example.Bulk.EventAEmail.start_link()
Example.Bulk.EventAOTP.start_link()
end

drop = fn ->
Queue.drop()
Event.drop()
end

bulk_queue = fn ->
bulk_start.()
Queue.read(worker: MishkaInstaller.PluginsManagement.PluginWorker)
end

qread = fn -> Queue.read(worker: MishkaInstaller.PluginsManagement.PluginWorker) end


put_r = fn ->
[elem(Queue.builder(%{timeout: 100_000, entries: [event: "test_a_r", type: :start]}), 1), elem(Queue.builder(%{timeout: 100_000, entries: [event: "test_a_r_1", type: :start]}), 1)]
|> Queue.put_r(worker: MishkaInstaller.PluginsManagement.PluginWorker)
end

put = fn ->
[elem(Queue.builder(%{entries: [event: "test_a", type: :start]}), 1), elem(Queue.builder(%{entries: [event: "test_a", type: :start]}), 1)]
|> Queue.put(worker: MishkaInstaller.PluginsManagement.PluginWorker)
end

len = fn ->
get = Queue.read(worker: MishkaInstaller.PluginsManagement.PluginWorker)
{QueueAssistant.len(get.queued), length(get.running)}
end

add_new_queue =
fn ->
[elem(Queue.builder(%{entries: [role: "user1::new_queue::Example.WoerkerTest"]}), 1)] |> Queue.put_r(worker: Example.WoerkerTest)
put_r_full = fn ->
MnesiaAssistant.Transaction.transaction fn ->
[elem(Queue.builder(%{timeout: 100_000, entries: [event: "test_a_r1", type: :start]}), 1)]
|> Queue.put_r(worker: MishkaInstaller.PluginsManagement.PluginWorker)

[elem(Queue.builder(%{timeout: 100_000, entries: [event: "test_a_r2", type: :start]}), 1)]
|> Queue.put_r(worker: MishkaInstaller.PluginsManagement.PluginWorker)

[elem(Queue.builder(%{timeout: 100_000, entries: [event: "test_a_r3", type: :start]}), 1)]
|> Queue.put_r(worker: MishkaInstaller.PluginsManagement.PluginWorker)

[elem(Queue.builder(%{timeout: 100_000, entries: [event: "test_a_r4", type: :start]}), 1)]
|> Queue.put_r(worker: MishkaInstaller.PluginsManagement.PluginWorker)

[elem(Queue.builder(%{timeout: 100_000, entries: [event: "test_a_r5", type: :start]}), 1)]
|> Queue.put_r(worker: MishkaInstaller.PluginsManagement.PluginWorker)

[elem(Queue.builder(%{timeout: 100_000, entries: [event: "test_a_r6", type: :start]}), 1)]
|> Queue.put_r(worker: MishkaInstaller.PluginsManagement.PluginWorker)
:ok
end
end


stop = fn ->
drop.()
[
Example.Bulk.EventAEmail,
Example.Bulk.EventAOTP,
Example.Bulk.EventBEmail,
Example.Bulk.EventCEmail,
Example.Bulk.EventCOTP,
Example.Bulk.EventCSocial,
] |> Enum.map(&GenServer.stop(&1))

GenServer.stop(MishkaInstaller.PluginsManagement.PluginWorker)

Queue.new(%{
worker: MishkaInstaller.PluginsManagement.PluginWorker,
error: %{type: :continuously, max_try: 5, delay: 0}
})
end
15 changes: 13 additions & 2 deletions lib/plugins_management/event.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@ defmodule MishkaInstaller.PluginsManagement.Event do
alias MnesiaAssistant.Error, as: MError
alias MishkaInstaller.Helper.PluginModuleStateEvent
import MnesiaAssistant, only: [er: 1, erl_fields: 4]

# For testing
if Mix.env() in [:dev, :test] do
Code.ensure_loaded(MishkaInstaller.PluginsManagement.PluginWorker)
end

####################################################################################
########################## (▰˘◡˘▰) Schema (▰˘◡˘▰) ############################
####################################################################################
Expand Down Expand Up @@ -99,7 +105,7 @@ defmodule MishkaInstaller.PluginsManagement.Event do

Queue.new(%{
worker: MishkaInstaller.PluginsManagement.PluginWorker,
error: %{type: :continuously, max_try: 5, delay: 1000}
error: %{type: :continuously, max_try: 5, delay: 0}
})

# Do something with the result and then return
Expand Down Expand Up @@ -137,7 +143,12 @@ defmodule MishkaInstaller.PluginsManagement.Event do
{:ok, db_plg} <- update(:status, :started, data.id),
:ok <- MishkaInstaller.broadcast("event", :start, data, broadcast) do
if(purge) do
[elem(Queue.builder(%{entries: [event: db_plg.event, type: :start]}), 1)]
[
elem(
Queue.builder(%{timeout: 100_000, entries: [event: db_plg.event, type: :start]}),
1
)
]
|> Queue.put_r(worker: MishkaInstaller.PluginsManagement.PluginWorker)
end

Expand Down
59 changes: 37 additions & 22 deletions lib/plugins_management/hook.ex
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ defmodule MishkaInstaller.PluginsManagement.Hook do
[MishkaInstaller.ProcessingPipelines.Queue.Queue, Event]
|> MnesiaAssistant.Table.wait_for_tables(@wait_for_tables)

# TODO: it needs something like

new_state =
if :persistent_term.get(:event_status, nil) == "ready" do
Hook.register_start_helper(__MODULE__, state)
Expand Down Expand Up @@ -131,28 +133,33 @@ defmodule MishkaInstaller.PluginsManagement.Hook do

def handle_info(%{status: status, data: data}, state)
when status in [:started, :stopped] do
event = Keyword.get(state, :event)
depends = Keyword.get(state, :depends)

# We need some state, it will be saved again or not, it should not be loaded if
# |__ it is restored
event_status = :persistent_term.get(:event_status, nil)

new_state =
with true <- event_status == "ready",
true <- event == data.event,
true <- data.name in depends,
:ok <- Event.hold_statuses?(depends),
{:ok, struct} <- Event.update(:status, :restarted, name: @plugin_name) do
[elem(Queue.builder(%{entries: [event: struct.event, type: :start]}), 1)]
|> Queue.put_r(worker: MishkaInstaller.PluginsManagement.PluginWorker)

Keyword.merge(state, status: :restarted)
else
_ -> state
end
# event = Keyword.get(state, :event)
# depends = Keyword.get(state, :depends)

# # We need some state, it will be saved again or not, it should not be loaded if
# # |__ it is restored
# event_status = :persistent_term.get(:event_status, nil)

# new_state =
# with true <- event_status == "ready",
# true <- event == data.event,
# true <- data.name in depends,
# :ok <- Event.hold_statuses?(depends),
# {:ok, struct} <- Event.update(:status, :restarted, name: @plugin_name) do
# [
# elem(
# Queue.builder(%{timeout: 100_000, entries: [event: struct.event, type: :start]}),
# 1
# )
# ]
# |> Queue.put_r(worker: MishkaInstaller.PluginsManagement.PluginWorker)

# Keyword.merge(state, status: :restarted)
# else
# _ -> state
# end

{:noreply, new_state}
{:noreply, state}
end

def handle_info(_reason, state) do
Expand All @@ -166,8 +173,16 @@ defmodule MishkaInstaller.PluginsManagement.Hook do
####################################################################################
@doc false
def start_helper(module, state, reg_db_plg) do
case module.start_purge(true) do
case module.start_purge(true, false) do
{:ok, :start, st_db_plg} ->
[
elem(
Queue.builder(%{timeout: 100_000, entries: [event: st_db_plg.event, type: :start]}),
1
)
]
|> Queue.put_r(worker: MishkaInstaller.PluginsManagement.PluginWorker)

Keyword.merge(state, status: st_db_plg.status, depends: st_db_plg.depends)

{:error, [%{field: :event, action: :compile}]} ->
Expand Down
71 changes: 45 additions & 26 deletions lib/processing_pipelines/queue/job.ex
Original file line number Diff line number Diff line change
Expand Up @@ -160,37 +160,56 @@ defmodule MishkaInstaller.ProcessingPipelines.Queue.Job do
########################## (▰˘◡˘▰) Callback (▰˘◡˘▰) ##########################
####################################################################################
@impl true
def handle_cast({:run_next, worker, _job}, state) do
def handle_cast({:run_next, worker, _passed_job}, state) do
job = Queue.read(worker: worker)

new_state =
case {job.status, job.running} do
{status, running} when running == [] and status in [:started, :no_queue] ->
case Queue.next(id: job.id) do
{:ok, data} ->
if state.refs == [],
do: run_queues_change_state(Map.get(data, :running), worker, state),
else: state

error ->
Logger.error(
"Identifier: #{inspect(worker)}; action: handle_cast; Source: #{inspect(error)}"
)

state
end

{:running, _running} ->
state

{status, running} when running != [] and status in [:started, :no_queue] ->
Queue.change_status(:running, worker: worker)
run_queues_change_state(running, worker, state)

_e ->
state
if state.refs == [] and job.status in [:started, :no_queue] do
case Queue.next(id: job.id) do
{:ok, data} ->
run_queues_change_state(Map.get(data, :running, []), worker, state)

error ->
Logger.error(
"Identifier: #{inspect(worker)}; action: handle_cast; Source: #{inspect(error)}"
)

state
end
else
state
end

# new_state =
# case {job.status, job.running} do
# {status, running} when running == [] and status in [:started, :no_queue] ->
# if state.refs == [] do
# case Queue.next(id: job.id) do
# {:ok, data} ->
# run_queues_change_state(Map.get(data, :running), worker, state)

# error ->
# Logger.error(
# "Identifier: #{inspect(worker)}; action: handle_cast; Source: #{inspect(error)}"
# )

# state
# end
# else
# state
# end

# {:running, _running} ->
# state

# {status, running} when running != [] and status in [:started, :no_queue] ->
# Queue.change_status(:running, worker: worker)
# run_queues_change_state(running, worker, state)

# _e ->
# state
# end

{:noreply, new_state}
end

Expand Down
38 changes: 20 additions & 18 deletions lib/processing_pipelines/queue/queue.ex
Original file line number Diff line number Diff line change
Expand Up @@ -221,20 +221,23 @@ defmodule MishkaInstaller.ProcessingPipelines.Queue.Queue do
```
"""
def put(new_queues, args) when is_list(args) do
data_queued =
if is_list(new_queues), do: QueueAssistant.from_list(new_queues), else: new_queues

with {:ok, job_data, _worker_pid} <- get_job_data_when_worker_pid_is_alive(:put_queue, args),
:ok <- is_queue_helper?(:put_queue, data_queued),
:ok <- is_not_empty_queue_helper?(:put_queue, data_queued),
que <-
Map.merge(job_data, %{
queued: QueueAssistant.join_to_list(job_data.queued, data_queued),
status: if(job_data.status == :no_queue, do: :started, else: job_data.status)
}),
{:ok, data} <- update(que, job_data.id) do
{:ok, data}
end
MnesiaAssistant.Transaction.transaction(fn ->
data_queued =
if is_list(new_queues), do: QueueAssistant.from_list(new_queues), else: new_queues

with {:ok, job_data, _worker_pid} <-
get_job_data_when_worker_pid_is_alive(:put_queue, args),
:ok <- is_queue_helper?(:put_queue, data_queued),
:ok <- is_not_empty_queue_helper?(:put_queue, data_queued),
que <-
Map.merge(job_data, %{
queued: QueueAssistant.join_to_list(job_data.queued, data_queued),
status: if(job_data.status == :no_queue, do: :started, else: job_data.status)
}),
{:ok, data} <- update(que, job_data.id) do
{:ok, data}
end
end)
end

@doc """
Expand All @@ -253,7 +256,7 @@ defmodule MishkaInstaller.ProcessingPipelines.Queue.Queue do
"""
def put_r(new_queues, args) when is_list(args) do
case put(new_queues, args) do
{:ok, data} ->
{:atomic, {:ok, data}} ->
Job.run_next({data.worker, data})
{:ok, data}

Expand Down Expand Up @@ -904,14 +907,13 @@ defmodule MishkaInstaller.ProcessingPipelines.Queue.Queue do
end
end

defp save_in_mnesia(map, args) do
defp save_in_mnesia(map, _args) do
values_tuple =
([__MODULE__] ++ Enum.map(Job.keys(), &Map.get(map, &1)))
|> List.to_tuple()

Transaction.transaction(fn ->
Query.delete(__MODULE__, args, :write)
Query.write(values_tuple)
Query.write(__MODULE__, values_tuple, :write)
end)
|> case do
{:atomic, :ok} ->
Expand Down
Loading

0 comments on commit dd1ee85

Please sign in to comment.