Skip to content

Commit

Permalink
Streaming media item creation during indexing (#49)
Browse files Browse the repository at this point in the history
* Implemented streaming during indexing

* Updated file watcher to enqueue download; refactored download worker methods

* Updated File Follower Server timeout
  • Loading branch information
kieraneglin authored Mar 5, 2024
1 parent b370c97 commit f55cdc8
Show file tree
Hide file tree
Showing 22 changed files with 613 additions and 98 deletions.
2 changes: 2 additions & 0 deletions .iex.exs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ alias Pinchflat.Sources
alias Pinchflat.MediaClient.{SourceDetails, VideoDownloader}
alias Pinchflat.Metadata.{Zipper, ThumbnailFetcher}

alias Pinchflat.Utils.FilesystemUtils.FileFollowerServer

defmodule IexHelpers do
def playlist_url do
"https://www.youtube.com/playlist?list=PLmqC3wPkeL8kSlTCcSMDD63gmSi7evcXS"
Expand Down
3 changes: 2 additions & 1 deletion config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ config :pinchflat,
# Setting AUTH_USERNAME and AUTH_PASSWORD implies you want to use basic auth.
# If either is unset, basic auth will not be used.
basic_auth_username: System.get_env("AUTH_USERNAME"),
basic_auth_password: System.get_env("AUTH_PASSWORD")
basic_auth_password: System.get_env("AUTH_PASSWORD"),
file_watcher_poll_interval: 1000

# Configures the endpoint
config :pinchflat, PinchflatWeb.Endpoint,
Expand Down
3 changes: 2 additions & 1 deletion config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ config :pinchflat,
yt_dlp_executable: Path.join([File.cwd!(), "/test/support/scripts/yt-dlp-mocks/repeater.sh"]),
media_directory: Path.join([System.tmp_dir!(), "test", "videos"]),
metadata_directory: Path.join([System.tmp_dir!(), "test", "metadata"]),
tmpfile_directory: Path.join([System.tmp_dir!(), "test", "tmpfiles"])
tmpfile_directory: Path.join([System.tmp_dir!(), "test", "tmpfiles"]),
file_watcher_poll_interval: 50

config :pinchflat, Oban, testing: :manual

Expand Down
18 changes: 18 additions & 0 deletions lib/pinchflat/media.ex
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,24 @@ defmodule Pinchflat.Media do
|> Repo.all()
end

@doc """
For a given media_item, tells you if it is pending download. This is defined as
the media_item having a `media_filepath` of `nil` and matching the format selection
rules of the parent media_profile.
Intentionally does not take the `download_media` setting of the source into account.
Returns boolean()
"""
def pending_download?(%MediaItem{} = media_item) do
media_profile = Repo.preload(media_item, source: :media_profile).source.media_profile

MediaItem
|> where([mi], mi.id == ^media_item.id and is_nil(mi.media_filepath))
|> where(^build_format_clauses(media_profile))
|> Repo.exists?()
end

@doc """
Returns a list of media_items that match the search term. Adds a `matching_search_term`
virtual field to the result set.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ defmodule Pinchflat.MediaClient.Backends.BackendCommandRunner do
"""

@callback run(binary(), keyword(), binary()) :: {:ok, binary()} | {:error, binary(), integer()}
@callback run(binary(), keyword(), binary(), keyword()) :: {:ok, binary()} | {:error, binary(), integer()}
end
29 changes: 10 additions & 19 deletions lib/pinchflat/media_client/backends/yt_dlp/command_runner.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ defmodule Pinchflat.MediaClient.Backends.YtDlp.CommandRunner do
require Logger

alias Pinchflat.Utils.StringUtils
alias Pinchflat.Utils.FilesystemUtils, as: FSUtils
alias Pinchflat.MediaClient.Backends.BackendCommandRunner

@behaviour BackendCommandRunner
Expand All @@ -15,19 +16,20 @@ defmodule Pinchflat.MediaClient.Backends.YtDlp.CommandRunner do
a file and then returns its contents because yt-dlp will return warnings
to stdout even if the command is successful, but these will break JSON parsing.
Returns {:ok, binary()} | {:error, output, status}.
Additional Opts:
- :output_filepath - the path to save the output to. If not provided, a temporary
file will be created and used. Useful for if you need a reference to the file
for a file watcher.
IDEA: Indexing takes a long time, but the output is actually streamed to stdout.
Maybe we could listen to that stream instead so we can index videos as they're discovered.
See: https://stackoverflow.com/a/49061086/5665799
Returns {:ok, binary()} | {:error, output, status}.
"""
@impl BackendCommandRunner
def run(url, command_opts, output_template) do
def run(url, command_opts, output_template, addl_opts \\ []) do
command = backend_executable()
# These must stay in exactly this order, hence why I'm giving it its own variable.
# Also, can't use RAM file since yt-dlp needs a concrete filepath.
json_output_path = generate_json_output_path()
print_to_file_opts = [{:print_to_file, output_template}, json_output_path]
output_filepath = Keyword.get(addl_opts, :output_filepath, FSUtils.generate_metadata_tmpfile(:json))
print_to_file_opts = [{:print_to_file, output_template}, output_filepath]
formatted_command_opts = [url] ++ parse_options(command_opts ++ print_to_file_opts)

Logger.info("[yt-dlp] called with: #{Enum.join(formatted_command_opts, " ")}")
Expand All @@ -36,24 +38,13 @@ defmodule Pinchflat.MediaClient.Backends.YtDlp.CommandRunner do
{_, 0} ->
# IDEA: consider deleting the file after reading it
# (even on error? especially on error?)
File.read(json_output_path)
File.read(output_filepath)

{output, status} ->
{:error, output, status}
end
end

defp generate_json_output_path do
tmpfile_directory = Application.get_env(:pinchflat, :tmpfile_directory)
filepath = Path.join([tmpfile_directory, "#{StringUtils.random_string(64)}.json"])

# Ensure the file can be created and written to BEFORE we run the `yt-dlp` command
:ok = File.mkdir_p!(Path.dirname(filepath))
:ok = File.write(filepath, "")

filepath
end

# We want to satisfy the following behaviours:
#
# 1. If the key is an atom, convert it to a string and convert it to kebab case (for convenience)
Expand Down
20 changes: 17 additions & 3 deletions lib/pinchflat/media_client/backends/yt_dlp/video_collection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,33 @@ defmodule Pinchflat.MediaClient.Backends.YtDlp.VideoCollection do
videos (aka: a source [ie: channels, playlists]).
"""

require Logger

alias Pinchflat.Utils.FunctionUtils
alias Pinchflat.Utils.FilesystemUtils

@doc """
Returns a list of maps representing the videos in the collection.
Options:
- :file_listener_handler - a function that will be called with the path to the
file that will be written to when yt-dlp is done. This is useful for
setting up a file watcher to know when the file is ready to be read.
Returns {:ok, [map()]} | {:error, any, ...}.
"""
def get_media_attributes(url, command_opts \\ []) do
def get_media_attributes(url, addl_opts \\ []) do
runner = Application.get_env(:pinchflat, :yt_dlp_runner)
opts = command_opts ++ [:simulate, :skip_download]
command_opts = [:simulate, :skip_download]
output_template = "%(.{id,title,was_live,original_url,description})j"
output_filepath = FilesystemUtils.generate_metadata_tmpfile(:json)
file_listener_handler = Keyword.get(addl_opts, :file_listener_handler, false)

if file_listener_handler do
file_listener_handler.(output_filepath)
end

case runner.run(url, opts, output_template) do
case runner.run(url, command_opts, output_template, output_filepath: output_filepath) do
{:ok, output} ->
output
|> String.split("\n", trim: true)
Expand Down
15 changes: 10 additions & 5 deletions lib/pinchflat/media_client/source_details.ex
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,21 @@ defmodule Pinchflat.MediaClient.SourceDetails do
Returns a list of basic video data mapsfor the given source URL OR
source record using the given backend.
Options:
- :file_listener_handler - a function that will be called with the path to the
file that will be written to by yt-dlp. This is useful for
setting up a file watcher to read the file as it gets written to.
Returns {:ok, [map()]} | {:error, any, ...}.
"""
def get_media_attributes(sourceable, backend \\ :yt_dlp)
def get_media_attributes(sourceable, opts \\ [], backend \\ :yt_dlp)

def get_media_attributes(%Source{} = source, backend) do
source_module(backend).get_media_attributes(source.collection_id)
def get_media_attributes(%Source{} = source, opts, backend) do
get_media_attributes(source.collection_id, opts, backend)
end

def get_media_attributes(source_url, backend) when is_binary(source_url) do
source_module(backend).get_media_attributes(source_url)
def get_media_attributes(source_url, opts, backend) when is_binary(source_url) do
source_module(backend).get_media_attributes(source_url, opts)
end

defp source_module(backend) do
Expand Down
1 change: 0 additions & 1 deletion lib/pinchflat/tasks.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ defmodule Pinchflat.Tasks do
@moduledoc """
The Tasks context.
"""

import Ecto.Query, warn: false
alias Pinchflat.Repo

Expand Down
6 changes: 4 additions & 2 deletions lib/pinchflat/tasks/media_item_tasks.ex
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
defmodule Pinchflat.Tasks.MediaItemTasks do
@moduledoc """
This module contains methods used by or used to control tasks (aka workers)
related to media items.
Contains methods used by OR used to create/manage tasks for media items.
Tasks/workers are meant to be thin wrappers so most of the actual work they
do is also defined here. Essentially, a one-stop-shop for media-related tasks/workers.
"""
alias Pinchflat.Media

Expand Down
132 changes: 109 additions & 23 deletions lib/pinchflat/tasks/source_tasks.ex
Original file line number Diff line number Diff line change
@@ -1,16 +1,22 @@
defmodule Pinchflat.Tasks.SourceTasks do
@moduledoc """
This module contains methods used by or used to control tasks (aka workers)
related to sources.
Contains methods used by OR used to create/manage tasks for sources.
Tasks/workers are meant to be thin wrappers so most of the actual work they
do is also defined here. Essentially, a one-stop-shop for source-related tasks/workers.
"""

require Logger

alias Pinchflat.Media
alias Pinchflat.Tasks
alias Pinchflat.Sources
alias Pinchflat.Sources.Source
alias Pinchflat.Media.MediaItem
alias Pinchflat.MediaClient.SourceDetails
alias Pinchflat.Workers.MediaIndexingWorker
alias Pinchflat.Workers.VideoDownloadWorker
alias Pinchflat.Utils.FilesystemUtils.FileFollowerServer

@doc """
Starts tasks for indexing a source's media regardless of the source's indexing
Expand All @@ -35,30 +41,37 @@ defmodule Pinchflat.Tasks.SourceTasks do

@doc """
Given a media source, creates (indexes) the media by creating media_items for each
media ID in the source.
media ID in the source. Afterward, kicks off a download task for each pending media
item belonging to the source. You can't tell me the method name isn't descriptive!
Indexing is slow and usually returns a list of all media data at once for record creation.
To help with this, we use a file follower to watch the file that yt-dlp writes to
so we can create media items as they come in. This parallelizes the process and adds
clarity to the user experience. This has a few things to be aware of which are documented
below in the file watcher setup method.
NOTE: downloads are only enqueued if the source is set to download media. Downloads are
also enqueued for ALL pending media items, not just the ones that were indexed in this
job run. This should ensure that any stragglers are caught if, for some reason, they
weren't enqueued or somehow got de-queued.
Since indexing returns all media data EVERY TIME, we rely on the unique index of the
media_id to prevent duplicates. Due to both the file follower and the fact that future
indexing will index a lot of existing data, this method will MOSTLY return error
changesets (from the unique index violation) and not media items. This is intended.
Returns [%MediaItem{}, ...] | [%Ecto.Changeset{}, ...]
"""
def index_media_items(%Source{} = source) do
{:ok, media_attributes} = SourceDetails.get_media_attributes(source.original_url)
def index_and_enqueue_download_for_media_items(%Source{} = source) do
# See the method definition below for more info on how file watchers work
# (important reading if you're not familiar with it)
{:ok, media_attributes} = get_media_attributes_and_setup_file_watcher(source)

result = Enum.map(media_attributes, fn media_attrs -> create_media_item_from_attributes(source, media_attrs) end)
Sources.update_source(source, %{last_indexed_at: DateTime.utc_now()})
enqueue_pending_media_tasks(source)

media_attributes
|> Enum.map(fn media_attrs ->
attrs = %{
source_id: source.id,
title: media_attrs["title"],
media_id: media_attrs["id"],
original_url: media_attrs["original_url"],
livestream: media_attrs["was_live"],
description: media_attrs["description"]
}

case Media.create_media_item(attrs) do
{:ok, media_item} -> media_item
{:error, changeset} -> changeset
end
end)
result
end

@doc """
Expand All @@ -70,8 +83,6 @@ defmodule Pinchflat.Tasks.SourceTasks do
that any stragglers are caught if, for some reason, they weren't enqueued
or somehow got de-queued.
I'm not sure of a case where this would happen, but it's cheap insurance.
Returns :ok
"""
def enqueue_pending_media_tasks(%Source{download_media: true} = source) do
Expand Down Expand Up @@ -99,4 +110,79 @@ defmodule Pinchflat.Tasks.SourceTasks do
|> Media.list_pending_media_items_for()
|> Enum.each(&Tasks.delete_pending_tasks_for/1)
end

# The file follower is a GenServer that watches a file for new lines and
# processes them. This works well, but we have to be resilliant to partially-written
# lines (ie: you should gracefully fail if you can't parse a line).
#
# This works in-tandem with the normal (blocking) media indexing behaviour. When
# the `get_media_attributes` method completes it'll return the FULL result to
# the caller for parsing. Ideally, every item in the list will have already
# been processed by the file follower, but if not, the caller handles creation
# of any media items that were missed/initially failed.
#
# It attempts a graceful shutdown of the file follower after the indexing is done,
# but the FileFollowerServer will also stop itself if it doesn't see any activity
# for a sufficiently long time.
defp get_media_attributes_and_setup_file_watcher(source) do
{:ok, pid} = FileFollowerServer.start_link()

handler = fn filepath -> setup_file_follower_watcher(pid, filepath, source) end
result = SourceDetails.get_media_attributes(source.original_url, file_listener_handler: handler)

FileFollowerServer.stop(pid)

result
end

defp setup_file_follower_watcher(pid, filepath, source) do
FileFollowerServer.watch_file(pid, filepath, fn line ->
case Phoenix.json_library().decode(line) do
{:ok, media_attrs} ->
Logger.debug("FileFollowerServer Handler: Got media attributes: #{inspect(media_attrs)}")

create_media_item_and_enqueue_download(source, media_attrs)

err ->
Logger.debug("FileFollowerServer Handler: Error decoding JSON: #{inspect(err)}")

err
end
end)
end

defp create_media_item_and_enqueue_download(source, media_attrs) do
maybe_media_item = create_media_item_from_attributes(source, media_attrs)

case maybe_media_item do
%MediaItem{} = media_item ->
if source.download_media && Media.pending_download?(media_item) do
Logger.debug("FileFollowerServer Handler: Enqueuing download task for #{inspect(media_attrs)}")

media_item
|> Map.take([:id])
|> VideoDownloadWorker.new()
|> Tasks.create_job_with_task(media_item)
end

changeset ->
changeset
end
end

defp create_media_item_from_attributes(source, media_attrs) do
attrs = %{
source_id: source.id,
title: media_attrs["title"],
media_id: media_attrs["id"],
original_url: media_attrs["original_url"],
livestream: media_attrs["was_live"],
description: media_attrs["description"]
}

case Media.create_media_item(attrs) do
{:ok, media_item} -> media_item
{:error, changeset} -> changeset
end
end
end
Loading

0 comments on commit f55cdc8

Please sign in to comment.