From f55cdc80dd2ad2b07e09e8b5cb0e7941fdf55b31 Mon Sep 17 00:00:00 2001 From: Kieran Date: Mon, 4 Mar 2024 17:14:02 -0800 Subject: [PATCH] Streaming media item creation during indexing (#49) * Implemented streaming during indexing * Updated file watcher to enqueue download; refactored download worker methods * Updated File Follower Server timeout --- .iex.exs | 2 + config/config.exs | 3 +- config/test.exs | 3 +- lib/pinchflat/media.ex | 18 +++ .../backends/backend_command_runner.ex | 1 + .../backends/yt_dlp/command_runner.ex | 29 ++-- .../backends/yt_dlp/video_collection.ex | 20 ++- lib/pinchflat/media_client/source_details.ex | 15 +- lib/pinchflat/tasks.ex | 1 - lib/pinchflat/tasks/media_item_tasks.ex | 6 +- lib/pinchflat/tasks/source_tasks.ex | 132 ++++++++++++++--- lib/pinchflat/utils/filesystem_utils.ex | 23 +++ .../filesystem_utils/file_follower_server.ex | 121 ++++++++++++++++ .../workers/media_indexing_worker.ex | 11 +- .../backends/yt_dlp/command_runner_test.exs | 8 +- .../backends/yt_dlp/video_collection_test.exs | 35 +++-- .../media_client/source_details_test.exs | 25 +++- test/pinchflat/media_test.exs | 28 ++++ test/pinchflat/tasks/source_tasks_test.exs | 136 ++++++++++++++++-- .../file_follower_server_test.exs | 52 +++++++ .../pinchflat/utils/filesystem_utils_test.exs | 16 +++ .../workers/media_indexing_worker_test.exs | 26 ++-- 22 files changed, 613 insertions(+), 98 deletions(-) create mode 100644 lib/pinchflat/utils/filesystem_utils.ex create mode 100644 lib/pinchflat/utils/filesystem_utils/file_follower_server.ex create mode 100644 test/pinchflat/utils/filesystem_utils/file_follower_server_test.exs create mode 100644 test/pinchflat/utils/filesystem_utils_test.exs diff --git a/.iex.exs b/.iex.exs index b15931d0..fbc5a164 100644 --- a/.iex.exs +++ b/.iex.exs @@ -15,6 +15,8 @@ alias Pinchflat.Sources alias Pinchflat.MediaClient.{SourceDetails, VideoDownloader} alias Pinchflat.Metadata.{Zipper, ThumbnailFetcher} +alias Pinchflat.Utils.FilesystemUtils.FileFollowerServer + defmodule IexHelpers do def playlist_url do "https://www.youtube.com/playlist?list=PLmqC3wPkeL8kSlTCcSMDD63gmSi7evcXS" diff --git a/config/config.exs b/config/config.exs index 36969600..617613be 100644 --- a/config/config.exs +++ b/config/config.exs @@ -20,7 +20,8 @@ config :pinchflat, # Setting AUTH_USERNAME and AUTH_PASSWORD implies you want to use basic auth. # If either is unset, basic auth will not be used. basic_auth_username: System.get_env("AUTH_USERNAME"), - basic_auth_password: System.get_env("AUTH_PASSWORD") + basic_auth_password: System.get_env("AUTH_PASSWORD"), + file_watcher_poll_interval: 1000 # Configures the endpoint config :pinchflat, PinchflatWeb.Endpoint, diff --git a/config/test.exs b/config/test.exs index 005d72fd..99821b6b 100644 --- a/config/test.exs +++ b/config/test.exs @@ -5,7 +5,8 @@ config :pinchflat, yt_dlp_executable: Path.join([File.cwd!(), "/test/support/scripts/yt-dlp-mocks/repeater.sh"]), media_directory: Path.join([System.tmp_dir!(), "test", "videos"]), metadata_directory: Path.join([System.tmp_dir!(), "test", "metadata"]), - tmpfile_directory: Path.join([System.tmp_dir!(), "test", "tmpfiles"]) + tmpfile_directory: Path.join([System.tmp_dir!(), "test", "tmpfiles"]), + file_watcher_poll_interval: 50 config :pinchflat, Oban, testing: :manual diff --git a/lib/pinchflat/media.ex b/lib/pinchflat/media.ex index 6f93d1b6..db304c11 100644 --- a/lib/pinchflat/media.ex +++ b/lib/pinchflat/media.ex @@ -66,6 +66,24 @@ defmodule Pinchflat.Media do |> Repo.all() end + @doc """ + For a given media_item, tells you if it is pending download. This is defined as + the media_item having a `media_filepath` of `nil` and matching the format selection + rules of the parent media_profile. + + Intentionally does not take the `download_media` setting of the source into account. + + Returns boolean() + """ + def pending_download?(%MediaItem{} = media_item) do + media_profile = Repo.preload(media_item, source: :media_profile).source.media_profile + + MediaItem + |> where([mi], mi.id == ^media_item.id and is_nil(mi.media_filepath)) + |> where(^build_format_clauses(media_profile)) + |> Repo.exists?() + end + @doc """ Returns a list of media_items that match the search term. Adds a `matching_search_term` virtual field to the result set. diff --git a/lib/pinchflat/media_client/backends/backend_command_runner.ex b/lib/pinchflat/media_client/backends/backend_command_runner.ex index 272286f8..cff94518 100644 --- a/lib/pinchflat/media_client/backends/backend_command_runner.ex +++ b/lib/pinchflat/media_client/backends/backend_command_runner.ex @@ -4,4 +4,5 @@ defmodule Pinchflat.MediaClient.Backends.BackendCommandRunner do """ @callback run(binary(), keyword(), binary()) :: {:ok, binary()} | {:error, binary(), integer()} + @callback run(binary(), keyword(), binary(), keyword()) :: {:ok, binary()} | {:error, binary(), integer()} end diff --git a/lib/pinchflat/media_client/backends/yt_dlp/command_runner.ex b/lib/pinchflat/media_client/backends/yt_dlp/command_runner.ex index 9abe6cb7..c9fec4a5 100644 --- a/lib/pinchflat/media_client/backends/yt_dlp/command_runner.ex +++ b/lib/pinchflat/media_client/backends/yt_dlp/command_runner.ex @@ -6,6 +6,7 @@ defmodule Pinchflat.MediaClient.Backends.YtDlp.CommandRunner do require Logger alias Pinchflat.Utils.StringUtils + alias Pinchflat.Utils.FilesystemUtils, as: FSUtils alias Pinchflat.MediaClient.Backends.BackendCommandRunner @behaviour BackendCommandRunner @@ -15,19 +16,20 @@ defmodule Pinchflat.MediaClient.Backends.YtDlp.CommandRunner do a file and then returns its contents because yt-dlp will return warnings to stdout even if the command is successful, but these will break JSON parsing. - Returns {:ok, binary()} | {:error, output, status}. + Additional Opts: + - :output_filepath - the path to save the output to. If not provided, a temporary + file will be created and used. Useful for if you need a reference to the file + for a file watcher. - IDEA: Indexing takes a long time, but the output is actually streamed to stdout. - Maybe we could listen to that stream instead so we can index videos as they're discovered. - See: https://stackoverflow.com/a/49061086/5665799 + Returns {:ok, binary()} | {:error, output, status}. """ @impl BackendCommandRunner - def run(url, command_opts, output_template) do + def run(url, command_opts, output_template, addl_opts \\ []) do command = backend_executable() # These must stay in exactly this order, hence why I'm giving it its own variable. # Also, can't use RAM file since yt-dlp needs a concrete filepath. - json_output_path = generate_json_output_path() - print_to_file_opts = [{:print_to_file, output_template}, json_output_path] + output_filepath = Keyword.get(addl_opts, :output_filepath, FSUtils.generate_metadata_tmpfile(:json)) + print_to_file_opts = [{:print_to_file, output_template}, output_filepath] formatted_command_opts = [url] ++ parse_options(command_opts ++ print_to_file_opts) Logger.info("[yt-dlp] called with: #{Enum.join(formatted_command_opts, " ")}") @@ -36,24 +38,13 @@ defmodule Pinchflat.MediaClient.Backends.YtDlp.CommandRunner do {_, 0} -> # IDEA: consider deleting the file after reading it # (even on error? especially on error?) - File.read(json_output_path) + File.read(output_filepath) {output, status} -> {:error, output, status} end end - defp generate_json_output_path do - tmpfile_directory = Application.get_env(:pinchflat, :tmpfile_directory) - filepath = Path.join([tmpfile_directory, "#{StringUtils.random_string(64)}.json"]) - - # Ensure the file can be created and written to BEFORE we run the `yt-dlp` command - :ok = File.mkdir_p!(Path.dirname(filepath)) - :ok = File.write(filepath, "") - - filepath - end - # We want to satisfy the following behaviours: # # 1. If the key is an atom, convert it to a string and convert it to kebab case (for convenience) diff --git a/lib/pinchflat/media_client/backends/yt_dlp/video_collection.ex b/lib/pinchflat/media_client/backends/yt_dlp/video_collection.ex index 6233c8d1..faf14d7a 100644 --- a/lib/pinchflat/media_client/backends/yt_dlp/video_collection.ex +++ b/lib/pinchflat/media_client/backends/yt_dlp/video_collection.ex @@ -4,19 +4,33 @@ defmodule Pinchflat.MediaClient.Backends.YtDlp.VideoCollection do videos (aka: a source [ie: channels, playlists]). """ + require Logger + alias Pinchflat.Utils.FunctionUtils + alias Pinchflat.Utils.FilesystemUtils @doc """ Returns a list of maps representing the videos in the collection. + Options: + - :file_listener_handler - a function that will be called with the path to the + file that will be written to when yt-dlp is done. This is useful for + setting up a file watcher to know when the file is ready to be read. + Returns {:ok, [map()]} | {:error, any, ...}. """ - def get_media_attributes(url, command_opts \\ []) do + def get_media_attributes(url, addl_opts \\ []) do runner = Application.get_env(:pinchflat, :yt_dlp_runner) - opts = command_opts ++ [:simulate, :skip_download] + command_opts = [:simulate, :skip_download] output_template = "%(.{id,title,was_live,original_url,description})j" + output_filepath = FilesystemUtils.generate_metadata_tmpfile(:json) + file_listener_handler = Keyword.get(addl_opts, :file_listener_handler, false) + + if file_listener_handler do + file_listener_handler.(output_filepath) + end - case runner.run(url, opts, output_template) do + case runner.run(url, command_opts, output_template, output_filepath: output_filepath) do {:ok, output} -> output |> String.split("\n", trim: true) diff --git a/lib/pinchflat/media_client/source_details.ex b/lib/pinchflat/media_client/source_details.ex index 26f205a1..000ada20 100644 --- a/lib/pinchflat/media_client/source_details.ex +++ b/lib/pinchflat/media_client/source_details.ex @@ -22,16 +22,21 @@ defmodule Pinchflat.MediaClient.SourceDetails do Returns a list of basic video data mapsfor the given source URL OR source record using the given backend. + Options: + - :file_listener_handler - a function that will be called with the path to the + file that will be written to by yt-dlp. This is useful for + setting up a file watcher to read the file as it gets written to. + Returns {:ok, [map()]} | {:error, any, ...}. """ - def get_media_attributes(sourceable, backend \\ :yt_dlp) + def get_media_attributes(sourceable, opts \\ [], backend \\ :yt_dlp) - def get_media_attributes(%Source{} = source, backend) do - source_module(backend).get_media_attributes(source.collection_id) + def get_media_attributes(%Source{} = source, opts, backend) do + get_media_attributes(source.collection_id, opts, backend) end - def get_media_attributes(source_url, backend) when is_binary(source_url) do - source_module(backend).get_media_attributes(source_url) + def get_media_attributes(source_url, opts, backend) when is_binary(source_url) do + source_module(backend).get_media_attributes(source_url, opts) end defp source_module(backend) do diff --git a/lib/pinchflat/tasks.ex b/lib/pinchflat/tasks.ex index e7ae4316..6426450f 100644 --- a/lib/pinchflat/tasks.ex +++ b/lib/pinchflat/tasks.ex @@ -2,7 +2,6 @@ defmodule Pinchflat.Tasks do @moduledoc """ The Tasks context. """ - import Ecto.Query, warn: false alias Pinchflat.Repo diff --git a/lib/pinchflat/tasks/media_item_tasks.ex b/lib/pinchflat/tasks/media_item_tasks.ex index c3cad23c..c733ae6f 100644 --- a/lib/pinchflat/tasks/media_item_tasks.ex +++ b/lib/pinchflat/tasks/media_item_tasks.ex @@ -1,7 +1,9 @@ defmodule Pinchflat.Tasks.MediaItemTasks do @moduledoc """ - This module contains methods used by or used to control tasks (aka workers) - related to media items. + Contains methods used by OR used to create/manage tasks for media items. + + Tasks/workers are meant to be thin wrappers so most of the actual work they + do is also defined here. Essentially, a one-stop-shop for media-related tasks/workers. """ alias Pinchflat.Media diff --git a/lib/pinchflat/tasks/source_tasks.ex b/lib/pinchflat/tasks/source_tasks.ex index 360345ea..302e7854 100644 --- a/lib/pinchflat/tasks/source_tasks.ex +++ b/lib/pinchflat/tasks/source_tasks.ex @@ -1,16 +1,22 @@ defmodule Pinchflat.Tasks.SourceTasks do @moduledoc """ - This module contains methods used by or used to control tasks (aka workers) - related to sources. + Contains methods used by OR used to create/manage tasks for sources. + + Tasks/workers are meant to be thin wrappers so most of the actual work they + do is also defined here. Essentially, a one-stop-shop for source-related tasks/workers. """ + require Logger + alias Pinchflat.Media alias Pinchflat.Tasks alias Pinchflat.Sources alias Pinchflat.Sources.Source + alias Pinchflat.Media.MediaItem alias Pinchflat.MediaClient.SourceDetails alias Pinchflat.Workers.MediaIndexingWorker alias Pinchflat.Workers.VideoDownloadWorker + alias Pinchflat.Utils.FilesystemUtils.FileFollowerServer @doc """ Starts tasks for indexing a source's media regardless of the source's indexing @@ -35,30 +41,37 @@ defmodule Pinchflat.Tasks.SourceTasks do @doc """ Given a media source, creates (indexes) the media by creating media_items for each - media ID in the source. + media ID in the source. Afterward, kicks off a download task for each pending media + item belonging to the source. You can't tell me the method name isn't descriptive! + + Indexing is slow and usually returns a list of all media data at once for record creation. + To help with this, we use a file follower to watch the file that yt-dlp writes to + so we can create media items as they come in. This parallelizes the process and adds + clarity to the user experience. This has a few things to be aware of which are documented + below in the file watcher setup method. + + NOTE: downloads are only enqueued if the source is set to download media. Downloads are + also enqueued for ALL pending media items, not just the ones that were indexed in this + job run. This should ensure that any stragglers are caught if, for some reason, they + weren't enqueued or somehow got de-queued. + + Since indexing returns all media data EVERY TIME, we rely on the unique index of the + media_id to prevent duplicates. Due to both the file follower and the fact that future + indexing will index a lot of existing data, this method will MOSTLY return error + changesets (from the unique index violation) and not media items. This is intended. Returns [%MediaItem{}, ...] | [%Ecto.Changeset{}, ...] """ - def index_media_items(%Source{} = source) do - {:ok, media_attributes} = SourceDetails.get_media_attributes(source.original_url) + def index_and_enqueue_download_for_media_items(%Source{} = source) do + # See the method definition below for more info on how file watchers work + # (important reading if you're not familiar with it) + {:ok, media_attributes} = get_media_attributes_and_setup_file_watcher(source) + + result = Enum.map(media_attributes, fn media_attrs -> create_media_item_from_attributes(source, media_attrs) end) Sources.update_source(source, %{last_indexed_at: DateTime.utc_now()}) + enqueue_pending_media_tasks(source) - media_attributes - |> Enum.map(fn media_attrs -> - attrs = %{ - source_id: source.id, - title: media_attrs["title"], - media_id: media_attrs["id"], - original_url: media_attrs["original_url"], - livestream: media_attrs["was_live"], - description: media_attrs["description"] - } - - case Media.create_media_item(attrs) do - {:ok, media_item} -> media_item - {:error, changeset} -> changeset - end - end) + result end @doc """ @@ -70,8 +83,6 @@ defmodule Pinchflat.Tasks.SourceTasks do that any stragglers are caught if, for some reason, they weren't enqueued or somehow got de-queued. - I'm not sure of a case where this would happen, but it's cheap insurance. - Returns :ok """ def enqueue_pending_media_tasks(%Source{download_media: true} = source) do @@ -99,4 +110,79 @@ defmodule Pinchflat.Tasks.SourceTasks do |> Media.list_pending_media_items_for() |> Enum.each(&Tasks.delete_pending_tasks_for/1) end + + # The file follower is a GenServer that watches a file for new lines and + # processes them. This works well, but we have to be resilliant to partially-written + # lines (ie: you should gracefully fail if you can't parse a line). + # + # This works in-tandem with the normal (blocking) media indexing behaviour. When + # the `get_media_attributes` method completes it'll return the FULL result to + # the caller for parsing. Ideally, every item in the list will have already + # been processed by the file follower, but if not, the caller handles creation + # of any media items that were missed/initially failed. + # + # It attempts a graceful shutdown of the file follower after the indexing is done, + # but the FileFollowerServer will also stop itself if it doesn't see any activity + # for a sufficiently long time. + defp get_media_attributes_and_setup_file_watcher(source) do + {:ok, pid} = FileFollowerServer.start_link() + + handler = fn filepath -> setup_file_follower_watcher(pid, filepath, source) end + result = SourceDetails.get_media_attributes(source.original_url, file_listener_handler: handler) + + FileFollowerServer.stop(pid) + + result + end + + defp setup_file_follower_watcher(pid, filepath, source) do + FileFollowerServer.watch_file(pid, filepath, fn line -> + case Phoenix.json_library().decode(line) do + {:ok, media_attrs} -> + Logger.debug("FileFollowerServer Handler: Got media attributes: #{inspect(media_attrs)}") + + create_media_item_and_enqueue_download(source, media_attrs) + + err -> + Logger.debug("FileFollowerServer Handler: Error decoding JSON: #{inspect(err)}") + + err + end + end) + end + + defp create_media_item_and_enqueue_download(source, media_attrs) do + maybe_media_item = create_media_item_from_attributes(source, media_attrs) + + case maybe_media_item do + %MediaItem{} = media_item -> + if source.download_media && Media.pending_download?(media_item) do + Logger.debug("FileFollowerServer Handler: Enqueuing download task for #{inspect(media_attrs)}") + + media_item + |> Map.take([:id]) + |> VideoDownloadWorker.new() + |> Tasks.create_job_with_task(media_item) + end + + changeset -> + changeset + end + end + + defp create_media_item_from_attributes(source, media_attrs) do + attrs = %{ + source_id: source.id, + title: media_attrs["title"], + media_id: media_attrs["id"], + original_url: media_attrs["original_url"], + livestream: media_attrs["was_live"], + description: media_attrs["description"] + } + + case Media.create_media_item(attrs) do + {:ok, media_item} -> media_item + {:error, changeset} -> changeset + end + end end diff --git a/lib/pinchflat/utils/filesystem_utils.ex b/lib/pinchflat/utils/filesystem_utils.ex new file mode 100644 index 00000000..a2cfff38 --- /dev/null +++ b/lib/pinchflat/utils/filesystem_utils.ex @@ -0,0 +1,23 @@ +defmodule Pinchflat.Utils.FilesystemUtils do + @moduledoc """ + Utility methods for working with the filesystem + """ + + alias Pinchflat.Utils.StringUtils + + @doc """ + Generates a temporary file and returns its path. The file is empty and has the given type. + Generates all the directories in the path if they don't exist. + + Returns binary() + """ + def generate_metadata_tmpfile(type) do + tmpfile_directory = Application.get_env(:pinchflat, :tmpfile_directory) + filepath = Path.join([tmpfile_directory, "#{StringUtils.random_string(64)}.#{type}"]) + + :ok = File.mkdir_p!(Path.dirname(filepath)) + :ok = File.write(filepath, "") + + filepath + end +end diff --git a/lib/pinchflat/utils/filesystem_utils/file_follower_server.ex b/lib/pinchflat/utils/filesystem_utils/file_follower_server.ex new file mode 100644 index 00000000..f6987e40 --- /dev/null +++ b/lib/pinchflat/utils/filesystem_utils/file_follower_server.ex @@ -0,0 +1,121 @@ +defmodule Pinchflat.Utils.FilesystemUtils.FileFollowerServer do + @moduledoc """ + A GenServer that watches a file for new lines and processes them as they come in. + This is useful for tailing log files and other similar tasks. If there's no activity + for a certain amount of time, the server will stop itself. + """ + use GenServer + + require Logger + + @poll_interval_ms Application.compile_env(:pinchflat, :file_watcher_poll_interval) + @activity_timeout_ms 60_000 + + # Client API + @doc """ + Starts the file follower server + + Returns {:ok, pid} or {:error, reason} + """ + def start_link() do + GenServer.start_link(__MODULE__, []) + end + + @doc """ + Starts the file watcher for a given filepath and handler function. + + Returns :ok + """ + def watch_file(process, filepath, handler) do + GenServer.cast(process, {:watch_file, filepath, handler}) + end + + @doc """ + Stops the file watcher and closes the file. + + Returns :ok + """ + def stop(process) do + GenServer.cast(process, :stop) + end + + # Server Callbacks + @impl true + def init(_opts) do + # Start with a blank state because, based on the common calling + # pattern for this module, we'll need a reference to the server's + # PID before we start watching any files so we can later stop the + # server gracefully. + {:ok, %{}} + end + + @impl true + def handle_cast({:watch_file, filepath, handler}, _old_state) do + {:ok, io_device} = :file.open(filepath, [:raw, :read_ahead, :binary]) + + state = %{ + io_device: io_device, + last_activity: DateTime.utc_now(), + handler: handler + } + + Process.send(self(), :read_new_lines, []) + + {:noreply, state} + end + + @impl true + def handle_cast(:stop, state) do + Logger.debug("Gracefully stopping file follower") + :file.close(state.io_device) + + {:stop, :normal, state} + end + + @impl true + def handle_info(:read_new_lines, state) do + last_activity = state.last_activity + + # If there's no new lines written for a certain amount of time, stop the server + if DateTime.diff(DateTime.utc_now(), last_activity, :millisecond) > @activity_timeout_ms do + Logger.debug("No activity for #{@activity_timeout_ms}ms. Requesting stop.") + stop(self()) + + {:noreply, state} + else + attempt_process_new_lines(state) + end + end + + defp attempt_process_new_lines(state) do + io_device = state.io_device + + # This reads one line at a time. If a line is found, it + # will be passed to the handler, we'll note the time of + # the last activity, and then we'll immediately call this + # again to read the next line. + # + # If there are no lines, it waits for the poll interval + # before trying again. + case :file.read_line(io_device) do + {:ok, line} -> + state.handler.(line) + + Process.send(self(), :read_new_lines, []) + + {:noreply, %{state | last_activity: DateTime.utc_now()}} + + :eof -> + Logger.debug("EOF reached, waiting before trying to read new lines") + Process.send_after(self(), :read_new_lines, @poll_interval_ms) + + {:noreply, state} + + {:error, reason} -> + Logger.error("Error reading file: #{reason}") + stop(self()) + + {:noreply, state} + end + end +end diff --git a/lib/pinchflat/workers/media_indexing_worker.ex b/lib/pinchflat/workers/media_indexing_worker.ex index ae54ac13..d2ff6766 100644 --- a/lib/pinchflat/workers/media_indexing_worker.ex +++ b/lib/pinchflat/workers/media_indexing_worker.ex @@ -43,13 +43,14 @@ defmodule Pinchflat.Workers.MediaIndexingWorker do case {source.index_frequency_minutes, source.last_indexed_at} do {index_freq, _} when index_freq > 0 -> # If the indexing is on a schedule simply run indexing and reschedule - index_media(source) + SourceTasks.index_and_enqueue_download_for_media_items(source) reschedule_indexing(source) {_, nil} -> # If the source has never been indexed, index it once # even if it's not meant to reschedule - index_media(source) + SourceTasks.index_and_enqueue_download_for_media_items(source) + :ok _ -> # If the source HAS been indexed and is not meant to reschedule, @@ -58,12 +59,6 @@ defmodule Pinchflat.Workers.MediaIndexingWorker do end end - defp index_media(source) do - SourceTasks.index_media_items(source) - # This method handles the case where a source is set to not download media - SourceTasks.enqueue_pending_media_tasks(source) - end - defp reschedule_indexing(source) do source |> Map.take([:id]) diff --git a/test/pinchflat/media_client/backends/yt_dlp/command_runner_test.exs b/test/pinchflat/media_client/backends/yt_dlp/command_runner_test.exs index d63bfccf..1e80a656 100644 --- a/test/pinchflat/media_client/backends/yt_dlp/command_runner_test.exs +++ b/test/pinchflat/media_client/backends/yt_dlp/command_runner_test.exs @@ -10,7 +10,7 @@ defmodule Pinchflat.MediaClient.Backends.YtDlp.CommandRunnerTest do on_exit(&reset_executable/0) end - describe "run/2" do + describe "run/4" do test "it returns the output and status when the command succeeds" do assert {:ok, _output} = Runner.run(@video_url, [], "") end @@ -57,6 +57,12 @@ defmodule Pinchflat.MediaClient.Backends.YtDlp.CommandRunnerTest do assert {:error, "", 1} = Runner.run(@video_url, [], "") end) end + + test "optionally lets you specify an output_filepath" do + assert {:ok, output} = Runner.run(@video_url, [], "%(id)s", output_filepath: "/tmp/yt-dlp-output.json") + + assert String.contains?(output, "--print-to-file %(id)s /tmp/yt-dlp-output.json") + end end defp wrap_executable(new_executable, fun) do diff --git a/test/pinchflat/media_client/backends/yt_dlp/video_collection_test.exs b/test/pinchflat/media_client/backends/yt_dlp/video_collection_test.exs index 1eae92af..71193c22 100644 --- a/test/pinchflat/media_client/backends/yt_dlp/video_collection_test.exs +++ b/test/pinchflat/media_client/backends/yt_dlp/video_collection_test.exs @@ -11,14 +11,16 @@ defmodule Pinchflat.MediaClient.Backends.YtDlp.VideoCollectionTest do describe "get_media_attributes/2" do test "returns a list of video attributes with no blank elements" do - expect(YtDlpRunnerMock, :run, fn _url, _opts, _ot -> {:ok, source_attributes_return_fixture() <> "\n\n"} end) + expect(YtDlpRunnerMock, :run, fn _url, _opts, _ot, _addl_opts -> + {:ok, source_attributes_return_fixture() <> "\n\n"} + end) assert {:ok, [%{"id" => "video1"}, %{"id" => "video2"}, %{"id" => "video3"}]} = VideoCollection.get_media_attributes(@channel_url) end test "it passes the expected default args" do - expect(YtDlpRunnerMock, :run, fn _url, opts, ot -> + expect(YtDlpRunnerMock, :run, fn _url, opts, ot, _addl_opts -> assert opts == [:simulate, :skip_download] assert ot == "%(.{id,title,was_live,original_url,description})j" @@ -28,20 +30,35 @@ defmodule Pinchflat.MediaClient.Backends.YtDlp.VideoCollectionTest do assert {:ok, _} = VideoCollection.get_media_attributes(@channel_url) end - test "it passes the expected custom args" do - expect(YtDlpRunnerMock, :run, fn _url, opts, _ot -> - assert opts == [:custom_arg, :simulate, :skip_download] + test "returns the error straight through when the command fails" do + expect(YtDlpRunnerMock, :run, fn _url, _opts, _ot, _addl_opts -> {:error, "Big issue", 1} end) + + assert {:error, "Big issue", 1} = VideoCollection.get_media_attributes(@channel_url) + end + + test "passes the explict tmpfile path to runner" do + expect(YtDlpRunnerMock, :run, fn _url, _opts, _ot, addl_opts -> + assert [{:output_filepath, filepath}] = addl_opts + assert String.ends_with?(filepath, ".json") {:ok, ""} end) - assert {:ok, _} = VideoCollection.get_media_attributes(@channel_url, [:custom_arg]) + assert {:ok, _} = VideoCollection.get_media_attributes(@channel_url) end - test "returns the error straight through when the command fails" do - expect(YtDlpRunnerMock, :run, fn _url, _opts, _ot -> {:error, "Big issue", 1} end) + test "supports an optional file_listener_handler that gets passed a filename" do + expect(YtDlpRunnerMock, :run, fn _url, _opts, _ot, _addl_opts -> {:ok, ""} end) + current_self = self() - assert {:error, "Big issue", 1} = VideoCollection.get_media_attributes(@channel_url) + handler = fn filename -> + send(current_self, {:handler, filename}) + end + + assert {:ok, _} = VideoCollection.get_media_attributes(@channel_url, file_listener_handler: handler) + + assert_receive {:handler, filename} + assert String.ends_with?(filename, ".json") end end diff --git a/test/pinchflat/media_client/source_details_test.exs b/test/pinchflat/media_client/source_details_test.exs index 91b9871b..482c08e2 100644 --- a/test/pinchflat/media_client/source_details_test.exs +++ b/test/pinchflat/media_client/source_details_test.exs @@ -45,7 +45,7 @@ defmodule Pinchflat.MediaClient.SourceDetailsTest do describe "get_media_attributes/2 when passed a string" do test "it passes the expected arguments to the backend" do - expect(YtDlpRunnerMock, :run, fn @channel_url, opts, ot -> + expect(YtDlpRunnerMock, :run, fn @channel_url, opts, ot, _addl_opts -> assert opts == [:simulate, :skip_download] assert ot == "%(.{id,title,was_live,original_url,description})j" @@ -56,7 +56,7 @@ defmodule Pinchflat.MediaClient.SourceDetailsTest do end test "it returns a list of maps" do - expect(YtDlpRunnerMock, :run, fn _url, _opts, _ot -> + expect(YtDlpRunnerMock, :run, fn _url, _opts, _ot, _addl_opts -> {:ok, source_attributes_return_fixture()} end) @@ -68,7 +68,7 @@ defmodule Pinchflat.MediaClient.SourceDetailsTest do test "it calls the backend with the source's collection ID" do source = source_fixture() - expect(YtDlpRunnerMock, :run, fn url, _opts, _ot -> + expect(YtDlpRunnerMock, :run, fn url, _opts, _ot, _addl_opts -> assert source.collection_id == url {:ok, source_attributes_return_fixture()} end) @@ -77,7 +77,7 @@ defmodule Pinchflat.MediaClient.SourceDetailsTest do end test "it builds options based on the source's media profile" do - expect(YtDlpRunnerMock, :run, fn _url, opts, _ot -> + expect(YtDlpRunnerMock, :run, fn _url, opts, _ot, _addl_opts -> assert opts == [:simulate, :skip_download] {:ok, ""} end) @@ -91,5 +91,22 @@ defmodule Pinchflat.MediaClient.SourceDetailsTest do source = source_fixture(media_profile_id: media_profile.id) assert {:ok, _} = SourceDetails.get_media_attributes(source) end + + test "lets you pass through an optional file_listener_handler" do + expect(YtDlpRunnerMock, :run, fn _url, _opts, _ot, _addl_opts -> + {:ok, source_attributes_return_fixture()} + end) + + source = source_fixture() + current_self = self() + + handler = fn filename -> + send(current_self, {:handler, filename}) + end + + assert {:ok, _} = SourceDetails.get_media_attributes(source, file_listener_handler: handler) + + assert_receive {:handler, _} + end end end diff --git a/test/pinchflat/media_test.exs b/test/pinchflat/media_test.exs index 15ca76d5..fc5521f1 100644 --- a/test/pinchflat/media_test.exs +++ b/test/pinchflat/media_test.exs @@ -215,6 +215,34 @@ defmodule Pinchflat.MediaTest do end end + describe "pending_download?/1" do + test "returns true when the media hasn't been downloaded" do + media_item = media_item_fixture(%{media_filepath: nil}) + + assert Media.pending_download?(media_item) + end + + test "returns false if the media has been downloaded" do + media_item = media_item_fixture(%{media_filepath: "/video/#{Faker.File.file_name(:video)}"}) + + refute Media.pending_download?(media_item) + end + + test "returns false if the media hasn't been downloaded but the profile doesn't DL shorts" do + source = source_fixture(%{media_profile_id: media_profile_fixture(%{shorts_behaviour: :exclude}).id}) + media_item = media_item_fixture(%{source_id: source.id, media_filepath: nil, original_url: "/shorts/"}) + + refute Media.pending_download?(media_item) + end + + test "returns false if the media hasn't been downloaded but the profile doesn't DL livestreams" do + source = source_fixture(%{media_profile_id: media_profile_fixture(%{livestream_behaviour: :exclude}).id}) + media_item = media_item_fixture(%{source_id: source.id, media_filepath: nil, livestream: true}) + + refute Media.pending_download?(media_item) + end + end + describe "search/1" do setup do media_item = diff --git a/test/pinchflat/tasks/source_tasks_test.exs b/test/pinchflat/tasks/source_tasks_test.exs index b0f90399..7f5e9882 100644 --- a/test/pinchflat/tasks/source_tasks_test.exs +++ b/test/pinchflat/tasks/source_tasks_test.exs @@ -5,6 +5,7 @@ defmodule Pinchflat.Tasks.SourceTasksTest do import Pinchflat.TasksFixtures import Pinchflat.MediaFixtures import Pinchflat.SourcesFixtures + import Pinchflat.ProfilesFixtures alias Pinchflat.Tasks alias Pinchflat.Tasks.Task @@ -43,15 +44,17 @@ defmodule Pinchflat.Tasks.SourceTasksTest do end end - describe "index_media_items/1" do + describe "index_and_enqueue_download_for_media_items/1" do setup do - stub(YtDlpRunnerMock, :run, fn _url, _opts, _ot -> {:ok, source_attributes_return_fixture()} end) + stub(YtDlpRunnerMock, :run, fn _url, _opts, _ot, _addl_opts -> + {:ok, source_attributes_return_fixture()} + end) {:ok, [source: source_fixture()]} end test "it creates a media_item record for each media ID returned", %{source: source} do - assert media_items = SourceTasks.index_media_items(source) + assert media_items = SourceTasks.index_and_enqueue_download_for_media_items(source) assert Enum.count(media_items) == 3 assert ["video1", "video2", "video3"] == Enum.map(media_items, & &1.media_id) @@ -63,15 +66,15 @@ defmodule Pinchflat.Tasks.SourceTasksTest do test "it attaches all media_items to the given source", %{source: source} do source_id = source.id - assert media_items = SourceTasks.index_media_items(source) + assert media_items = SourceTasks.index_and_enqueue_download_for_media_items(source) assert Enum.count(media_items) == 3 assert Enum.all?(media_items, fn %MediaItem{source_id: ^source_id} -> true end) end test "it won't duplicate media_items based on media_id and source", %{source: source} do - _first_run = SourceTasks.index_media_items(source) - _duplicate_run = SourceTasks.index_media_items(source) + _first_run = SourceTasks.index_and_enqueue_download_for_media_items(source) + _duplicate_run = SourceTasks.index_and_enqueue_download_for_media_items(source) media_items = Repo.preload(source, :media_items).media_items assert Enum.count(media_items) == 3 @@ -80,8 +83,8 @@ defmodule Pinchflat.Tasks.SourceTasksTest do test "it can duplicate media_ids for different sources", %{source: source} do other_source = source_fixture() - media_items = SourceTasks.index_media_items(source) - media_items_other_source = SourceTasks.index_media_items(other_source) + media_items = SourceTasks.index_and_enqueue_download_for_media_items(source) + media_items_other_source = SourceTasks.index_and_enqueue_download_for_media_items(other_source) assert Enum.count(media_items) == 3 assert Enum.count(media_items_other_source) == 3 @@ -91,8 +94,8 @@ defmodule Pinchflat.Tasks.SourceTasksTest do end test "it returns a list of media_items or changesets", %{source: source} do - first_run = SourceTasks.index_media_items(source) - duplicate_run = SourceTasks.index_media_items(source) + first_run = SourceTasks.index_and_enqueue_download_for_media_items(source) + duplicate_run = SourceTasks.index_and_enqueue_download_for_media_items(source) assert Enum.all?(first_run, fn %MediaItem{} -> true end) assert Enum.all?(duplicate_run, fn %Ecto.Changeset{} -> true end) @@ -101,11 +104,122 @@ defmodule Pinchflat.Tasks.SourceTasksTest do test "it updates the source's last_indexed_at field", %{source: source} do assert source.last_indexed_at == nil - SourceTasks.index_media_items(source) + SourceTasks.index_and_enqueue_download_for_media_items(source) source = Repo.reload!(source) assert DateTime.diff(DateTime.utc_now(), source.last_indexed_at) < 2 end + + test "it enqueues a job for each pending media item" do + source = source_fixture() + media_item = media_item_fixture(source_id: source.id, media_filepath: nil) + + SourceTasks.index_and_enqueue_download_for_media_items(source) + + assert_enqueued(worker: VideoDownloadWorker, args: %{"id" => media_item.id}) + end + + test "it does not attach tasks if the source is set to not download" do + source = source_fixture(download_media: false) + media_item = media_item_fixture(source_id: source.id, media_filepath: nil) + + SourceTasks.index_and_enqueue_download_for_media_items(source) + + assert [] = Tasks.list_tasks_for(:media_item_id, media_item.id) + end + end + + describe "index_and_enqueue_download_for_media_items/1 when testing file watcher" do + setup do + {:ok, [source: source_fixture()]} + end + + test "creates a new media item for everything already in the file", %{source: source} do + watcher_poll_interval = Application.get_env(:pinchflat, :file_watcher_poll_interval) + + stub(YtDlpRunnerMock, :run, fn _url, _opts, _ot, addl_opts -> + filepath = Keyword.get(addl_opts, :output_filepath) + File.write(filepath, source_attributes_return_fixture()) + + # Need to add a delay to ensure the file watcher has time to read the file + :timer.sleep(watcher_poll_interval * 2) + # We know we're testing the file watcher since the syncronous call will only + # return an empty string (creating no records) + {:ok, ""} + end) + + assert Repo.aggregate(MediaItem, :count, :id) == 0 + SourceTasks.index_and_enqueue_download_for_media_items(source) + assert Repo.aggregate(MediaItem, :count, :id) == 3 + end + + test "enqueues a download for everything already in the file", %{source: source} do + watcher_poll_interval = Application.get_env(:pinchflat, :file_watcher_poll_interval) + + stub(YtDlpRunnerMock, :run, fn _url, _opts, _ot, addl_opts -> + filepath = Keyword.get(addl_opts, :output_filepath) + File.write(filepath, source_attributes_return_fixture()) + + # Need to add a delay to ensure the file watcher has time to read the file + :timer.sleep(watcher_poll_interval * 2) + # We know we're testing the file watcher since the syncronous call will only + # return an empty string (creating no records) + {:ok, ""} + end) + + refute_enqueued(worker: VideoDownloadWorker) + SourceTasks.index_and_enqueue_download_for_media_items(source) + assert_enqueued(worker: VideoDownloadWorker) + end + + test "does not enqueue downloads if the source is set to not download" do + watcher_poll_interval = Application.get_env(:pinchflat, :file_watcher_poll_interval) + source = source_fixture(download_media: false) + + stub(YtDlpRunnerMock, :run, fn _url, _opts, _ot, addl_opts -> + filepath = Keyword.get(addl_opts, :output_filepath) + File.write(filepath, source_attributes_return_fixture()) + + # Need to add a delay to ensure the file watcher has time to read the file + :timer.sleep(watcher_poll_interval * 2) + # We know we're testing the file watcher since the syncronous call will only + # return an empty string (creating no records) + {:ok, ""} + end) + + SourceTasks.index_and_enqueue_download_for_media_items(source) + refute_enqueued(worker: VideoDownloadWorker) + end + + test "does not enqueue downloads for media that doesn't match the profile's format options" do + watcher_poll_interval = Application.get_env(:pinchflat, :file_watcher_poll_interval) + profile = media_profile_fixture(%{shorts_behaviour: :exclude}) + source = source_fixture(%{media_profile_id: profile.id}) + + stub(YtDlpRunnerMock, :run, fn _url, _opts, _ot, addl_opts -> + filepath = Keyword.get(addl_opts, :output_filepath) + + contents = + Phoenix.json_library().encode!(%{ + id: "video2", + title: "Video 2", + original_url: "https://example.com/shorts/video2", + was_live: true, + description: "desc2" + }) + + File.write(filepath, contents) + + # Need to add a delay to ensure the file watcher has time to read the file + :timer.sleep(watcher_poll_interval * 2) + # We know we're testing the file watcher since the syncronous call will only + # return an empty string (creating no records) + {:ok, ""} + end) + + SourceTasks.index_and_enqueue_download_for_media_items(source) + refute_enqueued(worker: VideoDownloadWorker) + end end describe "enqueue_pending_media_tasks/1" do diff --git a/test/pinchflat/utils/filesystem_utils/file_follower_server_test.exs b/test/pinchflat/utils/filesystem_utils/file_follower_server_test.exs new file mode 100644 index 00000000..eb5aafc7 --- /dev/null +++ b/test/pinchflat/utils/filesystem_utils/file_follower_server_test.exs @@ -0,0 +1,52 @@ +defmodule Pinchflat.Utils.FilesystemUtils.FileFollowerServerTest do + use ExUnit.Case, async: true + + alias alias Pinchflat.Utils.FilesystemUtils + alias Pinchflat.Utils.FilesystemUtils.FileFollowerServer + + setup do + {:ok, pid} = FileFollowerServer.start_link() + tmpfile = FilesystemUtils.generate_metadata_tmpfile(:txt) + + {:ok, %{pid: pid, tmpfile: tmpfile}} + end + + describe "watch_file" do + test "calls the handler for each existing line in the file", %{pid: pid, tmpfile: tmpfile} do + File.write!(tmpfile, "line1\nline2") + parent = self() + + handler = fn line -> send(parent, line) end + FileFollowerServer.watch_file(pid, tmpfile, handler) + + assert_receive "line1\n" + assert_receive "line2" + end + + test "calls the handler for each new line in the file", %{pid: pid, tmpfile: tmpfile} do + parent = self() + file = File.open!(tmpfile, [:append]) + handler = fn line -> send(parent, line) end + + FileFollowerServer.watch_file(pid, tmpfile, handler) + + IO.binwrite(file, "line1\n") + assert_receive "line1\n" + IO.binwrite(file, "line2") + assert_receive "line2" + end + end + + describe "stop" do + test "stops the watcher", %{pid: pid, tmpfile: tmpfile} do + handler = fn _line -> :noop end + FileFollowerServer.watch_file(pid, tmpfile, handler) + + refute is_nil(Process.info(pid)) + FileFollowerServer.stop(pid) + # Gotta wait for the server to stop async + :timer.sleep(10) + assert is_nil(Process.info(pid)) + end + end +end diff --git a/test/pinchflat/utils/filesystem_utils_test.exs b/test/pinchflat/utils/filesystem_utils_test.exs new file mode 100644 index 00000000..709d2fdb --- /dev/null +++ b/test/pinchflat/utils/filesystem_utils_test.exs @@ -0,0 +1,16 @@ +defmodule Pinchflat.Utils.FilesystemUtilsTest do + use ExUnit.Case, async: true + + alias Pinchflat.Utils.FilesystemUtils + + describe "generate_metadata_tmpfile/1" do + test "creates a tmpfile and returns its path" do + res = FilesystemUtils.generate_metadata_tmpfile(:json) + + assert String.ends_with?(res, ".json") + assert File.exists?(res) + + File.rm!(res) + end + end +end diff --git a/test/pinchflat/workers/media_indexing_worker_test.exs b/test/pinchflat/workers/media_indexing_worker_test.exs index 9fdc00a3..e9b3fc4c 100644 --- a/test/pinchflat/workers/media_indexing_worker_test.exs +++ b/test/pinchflat/workers/media_indexing_worker_test.exs @@ -13,7 +13,7 @@ defmodule Pinchflat.Workers.MediaIndexingWorkerTest do describe "perform/1" do test "it indexes the source if it should be indexed" do - expect(YtDlpRunnerMock, :run, fn _url, _opts, _ot -> {:ok, ""} end) + expect(YtDlpRunnerMock, :run, fn _url, _opts, _ot, _addl_opts -> {:ok, ""} end) source = source_fixture(index_frequency_minutes: 10) @@ -21,7 +21,7 @@ defmodule Pinchflat.Workers.MediaIndexingWorkerTest do end test "it indexes the source no matter what if the source has never been indexed before" do - expect(YtDlpRunnerMock, :run, fn _url, _opts, _ot -> {:ok, ""} end) + expect(YtDlpRunnerMock, :run, fn _url, _opts, _ot, _addl_opts -> {:ok, ""} end) source = source_fixture(index_frequency_minutes: 0, last_indexed_at: nil) @@ -29,7 +29,7 @@ defmodule Pinchflat.Workers.MediaIndexingWorkerTest do end test "it does not do any indexing if the source has been indexed and shouldn't be rescheduled" do - expect(YtDlpRunnerMock, :run, 0, fn _url, _opts, _ot -> {:ok, ""} end) + expect(YtDlpRunnerMock, :run, 0, fn _url, _opts, _ot, _addl_opts -> {:ok, ""} end) source = source_fixture(index_frequency_minutes: -1, last_indexed_at: DateTime.utc_now()) @@ -37,7 +37,7 @@ defmodule Pinchflat.Workers.MediaIndexingWorkerTest do end test "it does not reschedule if the source shouldn't be indexed" do - stub(YtDlpRunnerMock, :run, fn _url, _opts, _ot -> {:ok, ""} end) + stub(YtDlpRunnerMock, :run, fn _url, _opts, _ot, _addl_opts -> {:ok, ""} end) source = source_fixture(index_frequency_minutes: -1) perform_job(MediaIndexingWorker, %{id: source.id}) @@ -46,7 +46,9 @@ defmodule Pinchflat.Workers.MediaIndexingWorkerTest do end test "it kicks off a download job for each pending media item" do - expect(YtDlpRunnerMock, :run, fn _url, _opts, _ot -> {:ok, source_attributes_return_fixture()} end) + expect(YtDlpRunnerMock, :run, fn _url, _opts, _ot, _addl_opts -> + {:ok, source_attributes_return_fixture()} + end) source = source_fixture(index_frequency_minutes: 10) perform_job(MediaIndexingWorker, %{id: source.id}) @@ -55,7 +57,9 @@ defmodule Pinchflat.Workers.MediaIndexingWorkerTest do end test "it starts a job for any pending media item even if it's from another run" do - expect(YtDlpRunnerMock, :run, fn _url, _opts, _ot -> {:ok, source_attributes_return_fixture()} end) + expect(YtDlpRunnerMock, :run, fn _url, _opts, _ot, _addl_opts -> + {:ok, source_attributes_return_fixture()} + end) source = source_fixture(index_frequency_minutes: 10) media_item_fixture(%{source_id: source.id, media_filepath: nil}) @@ -65,7 +69,9 @@ defmodule Pinchflat.Workers.MediaIndexingWorkerTest do end test "it does not kick off a job for media items that could not be saved" do - expect(YtDlpRunnerMock, :run, fn _url, _opts, _ot -> {:ok, source_attributes_return_fixture()} end) + expect(YtDlpRunnerMock, :run, fn _url, _opts, _ot, _addl_opts -> + {:ok, source_attributes_return_fixture()} + end) source = source_fixture(index_frequency_minutes: 10) media_item_fixture(%{source_id: source.id, media_filepath: nil, media_id: "video1"}) @@ -76,7 +82,7 @@ defmodule Pinchflat.Workers.MediaIndexingWorkerTest do end test "it reschedules the job based on the index frequency" do - expect(YtDlpRunnerMock, :run, fn _url, _opts, _ot -> {:ok, ""} end) + expect(YtDlpRunnerMock, :run, fn _url, _opts, _ot, _addl_opts -> {:ok, ""} end) source = source_fixture(index_frequency_minutes: 10) perform_job(MediaIndexingWorker, %{id: source.id}) @@ -89,7 +95,7 @@ defmodule Pinchflat.Workers.MediaIndexingWorkerTest do end test "it creates a task for the rescheduled job" do - expect(YtDlpRunnerMock, :run, fn _url, _opts, _ot -> {:ok, ""} end) + expect(YtDlpRunnerMock, :run, fn _url, _opts, _ot, _addl_opts -> {:ok, ""} end) source = source_fixture(index_frequency_minutes: 10) task_count_fetcher = fn -> Enum.count(Tasks.list_tasks()) end @@ -100,7 +106,7 @@ defmodule Pinchflat.Workers.MediaIndexingWorkerTest do end test "it creates the basic media_item records" do - expect(YtDlpRunnerMock, :run, fn _url, _opts, _ot -> {:ok, source_attributes_return_fixture()} end) + expect(YtDlpRunnerMock, :run, fn _url, _opts, _ot, _addl_opts -> {:ok, source_attributes_return_fixture()} end) source = source_fixture(index_frequency_minutes: 10)