Skip to content

Commit

Permalink
Tied together tasks with jobs and channels
Browse files Browse the repository at this point in the history
  • Loading branch information
kieraneglin committed Jan 26, 2024
1 parent ea6b032 commit ec71ddc
Show file tree
Hide file tree
Showing 10 changed files with 259 additions and 16 deletions.
3 changes: 1 addition & 2 deletions lib/pinchflat/media/media_item.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@ defmodule Pinchflat.Media.MediaItem do
@required_fields ~w(media_id channel_id)a
@allowed_fields ~w(title media_id video_filepath channel_id)a

# TODO: consider making an attached `metadata` model to store the JSON response from whatever backend is used
# TODO: make a tasks model to track the jobs spawned
# IDEA: consider making an attached `metadata` model to store the JSON response from whatever backend is used

schema "media_items" do
field :title, :string
Expand Down
10 changes: 7 additions & 3 deletions lib/pinchflat/media_source.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ defmodule Pinchflat.MediaSource do
import Ecto.Query, warn: false
alias Pinchflat.Repo

alias Pinchflat.Tasks
alias Pinchflat.Media
alias Pinchflat.Tasks.ChannelTasks
alias Pinchflat.MediaSource.Channel
Expand Down Expand Up @@ -63,8 +64,8 @@ defmodule Pinchflat.MediaSource do
original_url (if changed). May attempt to start indexing the channel's
media if the indexing frequency has been changed.
TODO: ensure that indexing is cancelled/rescheduled if the indexing frequency
has been changed.
Existing indexing tasks will be cancelled if the indexing frequency has been
changed (logic in `ChannelTasks.kickoff_indexing_task`)
Returns {:ok, %Channel{}} | {:error, %Ecto.Changeset{}}
"""
Expand All @@ -75,9 +76,12 @@ defmodule Pinchflat.MediaSource do
end

@doc """
Deletes a channel. Returns {:ok, %Channel{}} | {:error, %Ecto.Changeset{}}
Deletes a channel and it's associated tasks (of any state).
Returns {:ok, %Channel{}} | {:error, %Ecto.Changeset{}}
"""
def delete_channel(%Channel{} = channel) do
Tasks.delete_tasks_for(channel)
Repo.delete(channel)
end

Expand Down
84 changes: 83 additions & 1 deletion lib/pinchflat/tasks.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ defmodule Pinchflat.Tasks do
alias Pinchflat.Repo

alias Pinchflat.Tasks.Task
alias Pinchflat.MediaSource.Channel

@doc """
Returns the list of tasks. Returns [%Task{}, ...]
Expand All @@ -15,6 +16,36 @@ defmodule Pinchflat.Tasks do
Repo.all(Task)
end

@doc """
Returns the list of tasks for a given record type and ID. Optionally allows you to specify
which job states to include.
Returns [%Task{}, ...]
"""
def list_tasks_for(attached_record_type, attached_record_id, job_states \\ Oban.Job.states()) do
stringified_states = Enum.map(job_states, &to_string/1)

Repo.all(
from t in Task,
join: j in assoc(t, :job),
where: field(t, ^attached_record_type) == ^attached_record_id,
where: j.state in ^stringified_states
)
end

@doc """
Returns the list of pending tasks for a given record type and ID.
Returns [%Task{}, ...]
"""
def list_pending_tasks_for(attached_record_type, attached_record_id) do
list_tasks_for(
attached_record_type,
attached_record_id,
[:available, :scheduled, :retryable]
)
end

@doc """
Gets a single task.
Expand All @@ -31,13 +62,64 @@ defmodule Pinchflat.Tasks do
|> Repo.insert()
end

# This one's function signature is designed to help simplify
# usage of `create_job_with_task/2`
def create_task(%Oban.Job{} = job, %Channel{} = channel) do
%Task{}
|> Task.changeset(%{job_id: job.id, channel_id: channel.id})
|> Repo.insert()
end

@doc """
Creates a job from given attrs, creating a task with an attached record
if successful.
Returns {:ok, %Task{}} | {:error, %Ecto.Changeset{}}.
"""
def create_job_with_task(job_attrs, task_attached_record) do
case Oban.insert(job_attrs) do
{:ok, job} -> create_task(job, task_attached_record)
err -> err
end
end

@doc """
Deletes a task. Returns {:ok, %Task{}} | {:error, %Ecto.Changeset{}}.
Deletes a task. Also cancels any attached job.
Returns {:ok, %Task{}} | {:error, %Ecto.Changeset{}}.
"""
def delete_task(%Task{} = task) do
:ok = Oban.cancel_job(task.job_id)

Repo.delete(task)
end

@doc """
Deletes all tasks attached to a given record, cancelling any attached jobs.
Returns :ok
"""
def delete_tasks_for(%Channel{} = channel) do
tasks = list_tasks_for(:channel_id, channel.id)

Enum.each(tasks, fn task ->
delete_task(task)
end)
end

@doc """
Deletes all _pending_ tasks attached to a given record, cancelling any attached jobs.
Returns :ok
"""
def delete_pending_tasks_for(%Channel{} = channel) do
tasks = list_pending_tasks_for(:channel_id, channel.id)

Enum.each(tasks, fn task ->
delete_task(task)
end)
end

@doc """
Returns an `%Ecto.Changeset{}` for tracking task changes.
"""
Expand Down
11 changes: 5 additions & 6 deletions lib/pinchflat/tasks/channel_tasks.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,24 @@ defmodule Pinchflat.Tasks.ChannelTasks do
This module contains methods for managing tasks (workers) related to channels.
"""

alias Pinchflat.Tasks
alias Pinchflat.MediaSource.Channel
alias Pinchflat.Workers.MediaIndexingWorker

@doc """
Starts tasks for indexing a channel's media.
TODO: modify so that updates cancel/reschedule existing tasks as-needed
TODO: modify so that deletion cancels existing tasks (or maybe can do from Postgres?)
TODO: modify so that starting a worker adds a Task record (not implemented yet)
Starts tasks for indexing a channel's media. Returns {:ok, :should_not_index} | {:ok, %Task{}}.
"""
def kickoff_indexing_task(%Channel{} = channel) do
Tasks.delete_pending_tasks_for(channel)

if channel.index_frequency_minutes <= 0 do
{:ok, :should_not_index}
else
channel
|> Map.take([:id])
# Schedule this one immediately, but future ones will be on an interval
|> MediaIndexingWorker.new()
|> Oban.insert()
|> Tasks.create_job_with_task(channel)
end
end
end
8 changes: 5 additions & 3 deletions lib/pinchflat/workers/media_indexing_worker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ defmodule Pinchflat.Workers.MediaIndexingWorker do

use Oban.Worker,
queue: :media_indexing,
unique: [period: :infinity, states: [:available, :scheduled]]
unique: [period: :infinity, states: [:available, :scheduled, :retryable]],
tags: ["media_source", "media_indexing"]

alias __MODULE__
alias Pinchflat.Tasks
alias Pinchflat.MediaSource

@impl Oban.Worker
Expand All @@ -23,7 +25,7 @@ defmodule Pinchflat.Workers.MediaIndexingWorker do
actually run every 1 hour and 30 minutes. The tradeoff of not inundating
the API with requests and also not overlapping jobs is worth it, IMO.
Returns :ok | {:ok, %Oban.Job{}}. Not that it matters.
Returns :ok | {:ok, %Task{}}. Not that it matters.
"""
def perform(%Oban.Job{args: %{"id" => channel_id}}) do
channel = MediaSource.get_channel!(channel_id)
Expand All @@ -41,6 +43,6 @@ defmodule Pinchflat.Workers.MediaIndexingWorker do
channel
|> Map.take([:id])
|> MediaIndexingWorker.new(schedule_in: channel.index_frequency_minutes * 60)
|> Oban.insert()
|> Tasks.create_job_with_task(channel)
end
end
3 changes: 2 additions & 1 deletion priv/repo/migrations/20240125212753_create_tasks.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ defmodule Pinchflat.Repo.Migrations.CreateTasks do
def change do
create table(:tasks) do
add :job_id, references(:oban_jobs, on_delete: :delete_all), null: false
add :channel_id, references(:channels, on_delete: :delete_all), null: true
# `restrict` because we need to be sure to delete pending tasks when a channel is deleted
add :channel_id, references(:channels, on_delete: :restrict), null: true

timestamps(type: :utc_datetime)
end
Expand Down
9 changes: 9 additions & 0 deletions test/pinchflat/media_source_test.exs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
defmodule Pinchflat.MediaSourceTest do
use Pinchflat.DataCase
import Mox
import Pinchflat.TasksFixtures
import Pinchflat.ProfilesFixtures
import Pinchflat.MediaSourceFixtures

Expand Down Expand Up @@ -218,6 +219,14 @@ defmodule Pinchflat.MediaSourceTest do
channel = channel_fixture()
assert %Ecto.Changeset{} = MediaSource.change_channel(channel)
end

test "deletion also deletes all associated tasks" do
channel = channel_fixture()
task = task_fixture(channel_id: channel.id)

assert {:ok, %Channel{}} = MediaSource.delete_channel(channel)
assert_raise Ecto.NoResultsError, fn -> Repo.reload!(task) end
end
end

describe "change_channel/2" do
Expand Down
19 changes: 19 additions & 0 deletions test/pinchflat/tasks/channel_tasks_test.exs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
defmodule Pinchflat.Tasks.ChannelTasksTest do
use Pinchflat.DataCase

import Pinchflat.TasksFixtures
import Pinchflat.MediaSourceFixtures

alias Pinchflat.Tasks.Task
alias Pinchflat.Tasks.ChannelTasks
alias Pinchflat.Workers.MediaIndexingWorker

Expand All @@ -22,5 +24,22 @@ defmodule Pinchflat.Tasks.ChannelTasksTest do

assert_enqueued(worker: MediaIndexingWorker, args: %{"id" => channel.id})
end

test "it creates and attaches a task if the interval is > 0" do
channel = channel_fixture(index_frequency_minutes: 1)

assert {:ok, %Task{} = task} = ChannelTasks.kickoff_indexing_task(channel)

assert task.channel_id == channel.id
end

test "it deletes any pending tasks for the channel" do
channel = channel_fixture()
task = task_fixture(channel_id: channel.id)

assert {:ok, _} = ChannelTasks.kickoff_indexing_task(channel)

assert_raise Ecto.NoResultsError, fn -> Repo.reload!(task) end
end
end
end
Loading

0 comments on commit ec71ddc

Please sign in to comment.