From a0d62a156a5b8916510cf3d851a0ac66420b2004 Mon Sep 17 00:00:00 2001 From: Kieran Eglin Date: Mon, 29 Jan 2024 20:36:59 -0800 Subject: [PATCH] Hooked up video downloading to the channel indexing pipeline --- lib/pinchflat/media.ex | 2 +- .../workers/media_indexing_worker.ex | 26 ++++++++++++++--- .../workers/media_indexing_worker_test.exs | 28 ++++++++++++++++--- 3 files changed, 47 insertions(+), 9 deletions(-) diff --git a/lib/pinchflat/media.ex b/lib/pinchflat/media.ex index 7e93f33e..9178e728 100644 --- a/lib/pinchflat/media.ex +++ b/lib/pinchflat/media.ex @@ -4,8 +4,8 @@ defmodule Pinchflat.Media do """ import Ecto.Query, warn: false - alias Pinchflat.Repo + alias Pinchflat.Repo alias Pinchflat.Media.MediaItem @doc """ diff --git a/lib/pinchflat/workers/media_indexing_worker.ex b/lib/pinchflat/workers/media_indexing_worker.ex index 5189d0d3..c3771475 100644 --- a/lib/pinchflat/workers/media_indexing_worker.ex +++ b/lib/pinchflat/workers/media_indexing_worker.ex @@ -9,12 +9,17 @@ defmodule Pinchflat.Workers.MediaIndexingWorker do alias __MODULE__ alias Pinchflat.Tasks alias Pinchflat.MediaSource + alias Pinchflat.Media.MediaItem + alias Pinchflat.Workers.VideoDownloadWorker @impl Oban.Worker @doc """ - The ID is that of a channel _record_, not a YouTube channel ID. + The ID is that of a channel _record_, not a YouTube channel ID. Indexes + the provided channel, kicks off downloads for each new MediaItem, and + reschedules the job to run again in the future (as determined by the + channel's `index_frequency_minutes` field). - NOTE: Re-scheduling here works a little different than you may expect. + README: Re-scheduling here works a little different than you may expect. The reschedule time is relative to the time the job has actually _completed_. This has some benefits but also side effects to be aware of: @@ -30,7 +35,7 @@ defmodule Pinchflat.Workers.MediaIndexingWorker do IDEA: Should I use paging and do indexing in chunks? Is that even faster? - Returns :ok | {:ok, %Task{}}. Not that it matters. + Returns :ok | {:ok, %Task{}} """ def perform(%Oban.Job{args: %{"id" => channel_id}}) do channel = MediaSource.get_channel!(channel_id) @@ -43,7 +48,20 @@ defmodule Pinchflat.Workers.MediaIndexingWorker do end defp index_media_and_reschedule(channel) do - MediaSource.index_media_items(channel) + channel + |> MediaSource.index_media_items() + |> Enum.each(fn media_item_or_changeset -> + case media_item_or_changeset do + %MediaItem{} = media_item -> + media_item + |> Map.take([:id]) + |> VideoDownloadWorker.new() + |> Oban.insert() + + _ -> + nil + end + end) channel |> Map.take([:id]) diff --git a/test/pinchflat/workers/media_indexing_worker_test.exs b/test/pinchflat/workers/media_indexing_worker_test.exs index 554c4b41..2683e1ed 100644 --- a/test/pinchflat/workers/media_indexing_worker_test.exs +++ b/test/pinchflat/workers/media_indexing_worker_test.exs @@ -6,6 +6,7 @@ defmodule Pinchflat.Workers.MediaIndexingWorkerTest do alias Pinchflat.Tasks alias Pinchflat.Workers.MediaIndexingWorker + alias Pinchflat.Workers.VideoDownloadWorker setup :verify_on_exit! @@ -28,15 +29,34 @@ defmodule Pinchflat.Workers.MediaIndexingWorkerTest do end test "it indexes the channel if it should be indexed" do - expect(YtDlpRunnerMock, :run, 1, fn _url, _opts, _ot -> {:ok, ""} end) + expect(YtDlpRunnerMock, :run, fn _url, _opts, _ot -> {:ok, ""} end) channel = channel_fixture(index_frequency_minutes: 10) perform_job(MediaIndexingWorker, %{id: channel.id}) end + test "it kicks off a download job for each new media item" do + expect(YtDlpRunnerMock, :run, fn _url, _opts, _ot -> {:ok, "video1"} end) + + channel = channel_fixture(index_frequency_minutes: 10) + perform_job(MediaIndexingWorker, %{id: channel.id}) + + assert [_] = all_enqueued(worker: VideoDownloadWorker) + end + + test "it does not kick off a job for media items that could not be saved" do + expect(YtDlpRunnerMock, :run, fn _url, _opts, _ot -> {:ok, "video1\nvideo1"} end) + + channel = channel_fixture(index_frequency_minutes: 10) + perform_job(MediaIndexingWorker, %{id: channel.id}) + + # Only one job should be enqueued, since the second video is a duplicate + assert [_] = all_enqueued(worker: VideoDownloadWorker) + end + test "it reschedules the job based on the index frequency" do - expect(YtDlpRunnerMock, :run, 1, fn _url, _opts, _ot -> {:ok, ""} end) + expect(YtDlpRunnerMock, :run, fn _url, _opts, _ot -> {:ok, ""} end) channel = channel_fixture(index_frequency_minutes: 10) perform_job(MediaIndexingWorker, %{id: channel.id}) @@ -49,7 +69,7 @@ defmodule Pinchflat.Workers.MediaIndexingWorkerTest do end test "it creates a task for the rescheduled job" do - expect(YtDlpRunnerMock, :run, 1, fn _url, _opts, _ot -> {:ok, ""} end) + expect(YtDlpRunnerMock, :run, fn _url, _opts, _ot -> {:ok, ""} end) channel = channel_fixture(index_frequency_minutes: 10) task_count_fetcher = fn -> Enum.count(Tasks.list_tasks()) end @@ -60,7 +80,7 @@ defmodule Pinchflat.Workers.MediaIndexingWorkerTest do end test "it creates the basic media_item records" do - expect(YtDlpRunnerMock, :run, 1, fn _url, _opts, _ot -> {:ok, "video1\nvideo2"} end) + expect(YtDlpRunnerMock, :run, fn _url, _opts, _ot -> {:ok, "video1\nvideo2"} end) channel = channel_fixture(index_frequency_minutes: 10)