Skip to content

Commit

Permalink
Hooked up video downloading to the channel indexing pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
kieraneglin committed Jan 30, 2024
1 parent 95a766f commit a0d62a1
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 9 deletions.
2 changes: 1 addition & 1 deletion lib/pinchflat/media.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ defmodule Pinchflat.Media do
"""

import Ecto.Query, warn: false
alias Pinchflat.Repo

alias Pinchflat.Repo
alias Pinchflat.Media.MediaItem

@doc """
Expand Down
26 changes: 22 additions & 4 deletions lib/pinchflat/workers/media_indexing_worker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,17 @@ defmodule Pinchflat.Workers.MediaIndexingWorker do
alias __MODULE__
alias Pinchflat.Tasks
alias Pinchflat.MediaSource
alias Pinchflat.Media.MediaItem
alias Pinchflat.Workers.VideoDownloadWorker

@impl Oban.Worker
@doc """
The ID is that of a channel _record_, not a YouTube channel ID.
The ID is that of a channel _record_, not a YouTube channel ID. Indexes
the provided channel, kicks off downloads for each new MediaItem, and
reschedules the job to run again in the future (as determined by the
channel's `index_frequency_minutes` field).
NOTE: Re-scheduling here works a little different than you may expect.
README: Re-scheduling here works a little different than you may expect.
The reschedule time is relative to the time the job has actually _completed_.
This has some benefits but also side effects to be aware of:
Expand All @@ -30,7 +35,7 @@ defmodule Pinchflat.Workers.MediaIndexingWorker do
IDEA: Should I use paging and do indexing in chunks? Is that even faster?
Returns :ok | {:ok, %Task{}}. Not that it matters.
Returns :ok | {:ok, %Task{}}
"""
def perform(%Oban.Job{args: %{"id" => channel_id}}) do
channel = MediaSource.get_channel!(channel_id)
Expand All @@ -43,7 +48,20 @@ defmodule Pinchflat.Workers.MediaIndexingWorker do
end

defp index_media_and_reschedule(channel) do
MediaSource.index_media_items(channel)
channel
|> MediaSource.index_media_items()
|> Enum.each(fn media_item_or_changeset ->
case media_item_or_changeset do
%MediaItem{} = media_item ->
media_item
|> Map.take([:id])
|> VideoDownloadWorker.new()
|> Oban.insert()

_ ->
nil
end
end)

channel
|> Map.take([:id])
Expand Down
28 changes: 24 additions & 4 deletions test/pinchflat/workers/media_indexing_worker_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ defmodule Pinchflat.Workers.MediaIndexingWorkerTest do

alias Pinchflat.Tasks
alias Pinchflat.Workers.MediaIndexingWorker
alias Pinchflat.Workers.VideoDownloadWorker

setup :verify_on_exit!

Expand All @@ -28,15 +29,34 @@ defmodule Pinchflat.Workers.MediaIndexingWorkerTest do
end

test "it indexes the channel if it should be indexed" do
expect(YtDlpRunnerMock, :run, 1, fn _url, _opts, _ot -> {:ok, ""} end)
expect(YtDlpRunnerMock, :run, fn _url, _opts, _ot -> {:ok, ""} end)

channel = channel_fixture(index_frequency_minutes: 10)

perform_job(MediaIndexingWorker, %{id: channel.id})
end

test "it kicks off a download job for each new media item" do
expect(YtDlpRunnerMock, :run, fn _url, _opts, _ot -> {:ok, "video1"} end)

channel = channel_fixture(index_frequency_minutes: 10)
perform_job(MediaIndexingWorker, %{id: channel.id})

assert [_] = all_enqueued(worker: VideoDownloadWorker)
end

test "it does not kick off a job for media items that could not be saved" do
expect(YtDlpRunnerMock, :run, fn _url, _opts, _ot -> {:ok, "video1\nvideo1"} end)

channel = channel_fixture(index_frequency_minutes: 10)
perform_job(MediaIndexingWorker, %{id: channel.id})

# Only one job should be enqueued, since the second video is a duplicate
assert [_] = all_enqueued(worker: VideoDownloadWorker)
end

test "it reschedules the job based on the index frequency" do
expect(YtDlpRunnerMock, :run, 1, fn _url, _opts, _ot -> {:ok, ""} end)
expect(YtDlpRunnerMock, :run, fn _url, _opts, _ot -> {:ok, ""} end)

channel = channel_fixture(index_frequency_minutes: 10)
perform_job(MediaIndexingWorker, %{id: channel.id})
Expand All @@ -49,7 +69,7 @@ defmodule Pinchflat.Workers.MediaIndexingWorkerTest do
end

test "it creates a task for the rescheduled job" do
expect(YtDlpRunnerMock, :run, 1, fn _url, _opts, _ot -> {:ok, ""} end)
expect(YtDlpRunnerMock, :run, fn _url, _opts, _ot -> {:ok, ""} end)

channel = channel_fixture(index_frequency_minutes: 10)
task_count_fetcher = fn -> Enum.count(Tasks.list_tasks()) end
Expand All @@ -60,7 +80,7 @@ defmodule Pinchflat.Workers.MediaIndexingWorkerTest do
end

test "it creates the basic media_item records" do
expect(YtDlpRunnerMock, :run, 1, fn _url, _opts, _ot -> {:ok, "video1\nvideo2"} end)
expect(YtDlpRunnerMock, :run, fn _url, _opts, _ot -> {:ok, "video1\nvideo2"} end)

channel = channel_fixture(index_frequency_minutes: 10)

Expand Down

0 comments on commit a0d62a1

Please sign in to comment.