From efb2388ef014bbeeac626a0021ad748bec6fdee9 Mon Sep 17 00:00:00 2001 From: Jon Zimbel Date: Thu, 31 Oct 2024 10:01:44 -0400 Subject: [PATCH 1/3] chore: Add telemetry handlers for Oban job events --- lib/arrow/application.ex | 2 ++ lib/arrow/gtfs.ex | 31 ++++++------------- lib/arrow/gtfs/job_helper.ex | 18 ++++++++++- lib/arrow/telemetry.ex | 60 ++++++++++++++++++++++++++++++++++++ lib/arrow_web/telemetry.ex | 4 +++ 5 files changed, 92 insertions(+), 23 deletions(-) create mode 100644 lib/arrow/telemetry.ex diff --git a/lib/arrow/application.ex b/lib/arrow/application.ex index 04ec8335..63de324c 100644 --- a/lib/arrow/application.ex +++ b/lib/arrow/application.ex @@ -9,6 +9,8 @@ defmodule Arrow.Application do run_adjustment_fetcher? = Application.get_env(:arrow, :fetch_adjustments?) run_migrations_at_startup? = Application.get_env(:arrow, :run_migrations_at_startup?) + Arrow.Telemetry.setup_telemetry() + # List all child processes to be supervised children = [ diff --git a/lib/arrow/gtfs.ex b/lib/arrow/gtfs.ex index e2bb0dd7..9c90dfa3 100644 --- a/lib/arrow/gtfs.ex +++ b/lib/arrow/gtfs.ex @@ -5,6 +5,7 @@ defmodule Arrow.Gtfs do require Logger alias Arrow.Gtfs.Importable + alias Arrow.Gtfs.JobHelper alias Arrow.Repo @import_timeout_ms :timer.minutes(10) @@ -24,53 +25,39 @@ defmodule Arrow.Gtfs do @spec import(Unzip.t(), String.t(), String.t() | nil, Oban.Job.t(), boolean) :: :ok | {:error, term} def import(unzip, new_version, current_version, job, dry_run? \\ false) do - Logger.info("GTFS import or validation job starting #{job_logging_params(job)}") + job_info = JobHelper.logging_params(job) + + Logger.info("GTFS import or validation job starting #{job_info}") with :ok <- validate_required_files(unzip), :ok <- validate_version_change(new_version, current_version) do case import_transaction(unzip, dry_run?) do {:ok, _} -> - Logger.info("GTFS import success #{job_logging_params(job)}") + Logger.info("GTFS import success #{job_info}") :ok {:error, :dry_run_success} -> - Logger.info("GTFS validation success #{job_logging_params(job)}") + Logger.info("GTFS validation success #{job_info}") :ok {:error, reason} = error -> - Logger.warn( - "GTFS import or validation failed #{job_logging_params(job)} reason=#{inspect(reason)}" - ) + Logger.warn("GTFS import or validation failed reason=#{inspect(reason)} #{job_info}") error end else :unchanged -> - Logger.info("GTFS import skipped due to unchanged version #{job_logging_params(job)}") + Logger.info("GTFS import skipped due to unchanged version #{job_info}") :ok {:error, reason} = error -> - Logger.warn( - "GTFS import or validation failed #{job_logging_params(job)} reason=#{inspect(reason)}" - ) + Logger.warn("GTFS import or validation failed reason=#{inspect(reason)} #{job_info}") error end end - defp job_logging_params(job) do - s3_object_key = - job.args - |> Map.fetch!("s3_uri") - |> URI.parse() - |> then(& &1.path) - - archive_version = Map.fetch!(job.args, "archive_version") - - "job_id=#{job.id} archive_s3_object_key=#{s3_object_key} archive_version=\"#{archive_version}\" job_worker=#{job.worker}" - end - defp import_transaction(unzip, dry_run?) do transaction = fn -> _ = truncate_all() diff --git a/lib/arrow/gtfs/job_helper.ex b/lib/arrow/gtfs/job_helper.ex index bdbe23bb..42feb842 100644 --- a/lib/arrow/gtfs/job_helper.ex +++ b/lib/arrow/gtfs/job_helper.ex @@ -20,7 +20,7 @@ defmodule Arrow.Gtfs.JobHelper do :all | :queued | :executing | :succeeded | :failed | :cancelled | :not_done | :done @doc """ - Returns details about GTFS import jobs. + Returns details about GTFS import/validation jobs in a JSON-encodable list of maps. """ @spec check_jobs(module, status_filter) :: list(map) def check_jobs(worker_mod, status_filter) do @@ -36,6 +36,22 @@ defmodule Arrow.Gtfs.JobHelper do ) end + @doc """ + Returns relevant info about an import/validation job, to be included in a log message. + """ + @spec logging_params(Oban.Job.t()) :: String.t() + def logging_params(job) do + s3_object_key = + job.args + |> Map.fetch!("s3_uri") + |> URI.parse() + |> then(& &1.path) + + archive_version = Map.fetch!(job.args, "archive_version") + + "job_id=#{job.id} archive_s3_object_key=#{s3_object_key} archive_version=\"#{archive_version}\" job_worker=#{inspect(job.worker)}" + end + defp job_filters do %{ all: Enum.map(Oban.Job.states(), &Atom.to_string/1), diff --git a/lib/arrow/telemetry.ex b/lib/arrow/telemetry.ex new file mode 100644 index 00000000..19b0d307 --- /dev/null +++ b/lib/arrow/telemetry.ex @@ -0,0 +1,60 @@ +defmodule Arrow.Telemetry do + @moduledoc """ + Telemetry listeners for Arrow business logic. + """ + require Logger + + @spec setup_telemetry() :: :ok + def setup_telemetry do + _ = + :telemetry.attach_many( + "oban", + [[:oban, :job, :start], [:oban, :job, :stop], [:oban, :job, :exception]], + # telemetry prefers event handler to be passed as a non-local function + # capture--i.e., with module name included--for performance reasons. + &Arrow.Telemetry.handle_event/4, + [] + ) + + :ok + end + + def handle_event(event, measures, meta, config) + + def handle_event([:oban, :job, :start], _measures, meta, _config) do + Logger.info("Oban job started #{get_job_info(meta.job)}") + end + + def handle_event([:oban, :job, :stop], measures, meta, _config) do + Logger.info( + "Oban job stopped #{get_job_info(meta.job)} state=#{meta.state} result=#{inspect(meta.result)} duration=#{measures.duration} memory=#{measures.memory} queue_time=#{measures.queue_time}" + ) + end + + def handle_event([:oban, :job, :exception], measures, meta, _config) do + details = + case meta.kind do + :error -> + message = Exception.message(meta.reason) + full_details = Exception.format(meta.kind, meta.reason, meta.stacktrace) + "message=#{message}\n#{full_details}" + + _other -> + "\n#{Exception.format(meta.kind, meta.reason, meta.stacktrace)}" + end + + Logger.warn( + "Oban job exception #{get_job_info(meta.job)} state=#{meta.state} result=#{inspect(meta.result)} duration=#{measures.duration} memory=#{measures.memory} queue_time=#{measures.queue_time} #{details}" + ) + end + + @gtfs_workers [inspect(Arrow.Gtfs.ImportWorker), inspect(Arrow.Gtfs.ValidationWorker)] + + defp get_job_info(%Oban.Job{worker: worker} = job) when worker in @gtfs_workers do + Arrow.Gtfs.JobHelper.logging_params(job) + end + + defp get_job_info(job) do + "job_id=#{job.id} job_args=#{inspect(job.args)} job_worker=#{inspect(job.worker)}" + end +end diff --git a/lib/arrow_web/telemetry.ex b/lib/arrow_web/telemetry.ex index 03c1ee87..ed284447 100644 --- a/lib/arrow_web/telemetry.ex +++ b/lib/arrow_web/telemetry.ex @@ -1,4 +1,8 @@ defmodule ArrowWeb.Telemetry do + @moduledoc """ + Provides data for the LiveDashboard "metrics" tab. + """ + use Supervisor import Telemetry.Metrics From 99fb485bc26cf62c5088c2039da4d41422047107 Mon Sep 17 00:00:00 2001 From: Jon Zimbel Date: Thu, 31 Oct 2024 10:52:06 -0400 Subject: [PATCH 2/3] Test Oban job exception logger --- lib/arrow/gtfs/archive.ex | 2 +- lib/arrow/gtfs/job_helper.ex | 2 +- lib/arrow/telemetry.ex | 2 +- test/arrow/telemetry_test.exs | 36 +++++++++++++++++++++++++++++++++++ 4 files changed, 39 insertions(+), 3 deletions(-) create mode 100644 test/arrow/telemetry_test.exs diff --git a/lib/arrow/gtfs/archive.ex b/lib/arrow/gtfs/archive.ex index 3db98e16..4d75db2d 100644 --- a/lib/arrow/gtfs/archive.ex +++ b/lib/arrow/gtfs/archive.ex @@ -105,7 +105,7 @@ defmodule Arrow.Gtfs.Archive do get_object_op = ExAws.S3.get_object(bucket, object_key) case apply(mod, fun, [get_object_op]) do - {:ok, %{body: zip_data}} -> + {:ok, %{body: zip_data, status_code: 200}} -> zip_data |> List.wrap() |> from_iodata() diff --git a/lib/arrow/gtfs/job_helper.ex b/lib/arrow/gtfs/job_helper.ex index 42feb842..8db0772e 100644 --- a/lib/arrow/gtfs/job_helper.ex +++ b/lib/arrow/gtfs/job_helper.ex @@ -49,7 +49,7 @@ defmodule Arrow.Gtfs.JobHelper do archive_version = Map.fetch!(job.args, "archive_version") - "job_id=#{job.id} archive_s3_object_key=#{s3_object_key} archive_version=\"#{archive_version}\" job_worker=#{inspect(job.worker)}" + "job_id=#{job.id} archive_s3_object_key=#{inspect(s3_object_key)} archive_version=#{inspect(archive_version)} job_worker=#{inspect(job.worker)}" end defp job_filters do diff --git a/lib/arrow/telemetry.ex b/lib/arrow/telemetry.ex index 19b0d307..00eb2637 100644 --- a/lib/arrow/telemetry.ex +++ b/lib/arrow/telemetry.ex @@ -37,7 +37,7 @@ defmodule Arrow.Telemetry do :error -> message = Exception.message(meta.reason) full_details = Exception.format(meta.kind, meta.reason, meta.stacktrace) - "message=#{message}\n#{full_details}" + "message=#{inspect(message)}\n#{full_details}" _other -> "\n#{Exception.format(meta.kind, meta.reason, meta.stacktrace)}" diff --git a/test/arrow/telemetry_test.exs b/test/arrow/telemetry_test.exs new file mode 100644 index 00000000..be847d0f --- /dev/null +++ b/test/arrow/telemetry_test.exs @@ -0,0 +1,36 @@ +defmodule Arrow.ExceptionalWorker do + @moduledoc """ + Worker that raises an exception. + """ + use Oban.Worker + + @impl Oban.Worker + def perform(%Oban.Job{args: %{"arg" => arg}}) do + raise "argh! arg: #{arg}" + end +end + +defmodule Arrow.TelemetryTest do + @moduledoc false + use ExUnit.Case, async: true + use Oban.Testing, repo: Arrow.Repo + import ExUnit.CaptureLog + + describe "oban.job.exception listener" do + test "logs exception info" do + log = + capture_log([level: :warning], fn -> + try do + perform_job(Arrow.ExceptionalWorker, %{ + arg: "argyle gargoyle" + }) + rescue + _ -> nil + end + end) + + assert log =~ "Oban job exception" + assert log =~ ~s|message="argh! arg: argyle gargoyle"| + end + end +end From aad7ea03ad007e9869e123effa93105949486fcc Mon Sep 17 00:00:00 2001 From: Jon Zimbel Date: Thu, 31 Oct 2024 12:04:35 -0400 Subject: [PATCH 3/3] Logger.warn -> Logger.warning --- lib/arrow/telemetry.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/arrow/telemetry.ex b/lib/arrow/telemetry.ex index 00eb2637..4587471c 100644 --- a/lib/arrow/telemetry.ex +++ b/lib/arrow/telemetry.ex @@ -43,7 +43,7 @@ defmodule Arrow.Telemetry do "\n#{Exception.format(meta.kind, meta.reason, meta.stacktrace)}" end - Logger.warn( + Logger.warning( "Oban job exception #{get_job_info(meta.job)} state=#{meta.state} result=#{inspect(meta.result)} duration=#{measures.duration} memory=#{measures.memory} queue_time=#{measures.queue_time} #{details}" ) end