Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Add telemetry handlers for Oban job events #1029

Merged
merged 4 commits into from
Oct 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions lib/arrow/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ defmodule Arrow.Application do
run_adjustment_fetcher? = Application.get_env(:arrow, :fetch_adjustments?)
run_migrations_at_startup? = Application.get_env(:arrow, :run_migrations_at_startup?)

Arrow.Telemetry.setup_telemetry()

# List all child processes to be supervised
children =
[
Expand Down
31 changes: 9 additions & 22 deletions lib/arrow/gtfs.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ defmodule Arrow.Gtfs do

require Logger
alias Arrow.Gtfs.Importable
alias Arrow.Gtfs.JobHelper
alias Arrow.Repo

@import_timeout_ms :timer.minutes(10)
Expand All @@ -24,53 +25,39 @@ defmodule Arrow.Gtfs do
@spec import(Unzip.t(), String.t(), String.t() | nil, Oban.Job.t(), boolean) ::
:ok | {:error, term}
def import(unzip, new_version, current_version, job, dry_run? \\ false) do
Logger.info("GTFS import or validation job starting #{job_logging_params(job)}")
job_info = JobHelper.logging_params(job)

Logger.info("GTFS import or validation job starting #{job_info}")

with :ok <- validate_required_files(unzip),
:ok <- validate_version_change(new_version, current_version) do
case import_transaction(unzip, dry_run?) do
{:ok, _} ->
Logger.info("GTFS import success #{job_logging_params(job)}")
Logger.info("GTFS import success #{job_info}")
:ok

{:error, :dry_run_success} ->
Logger.info("GTFS validation success #{job_logging_params(job)}")
Logger.info("GTFS validation success #{job_info}")
:ok

{:error, reason} = error ->
Logger.warning(
"GTFS import or validation failed #{job_logging_params(job)} reason=#{inspect(reason)}"
)
Logger.warning("GTFS import or validation failed reason=#{inspect(reason)} #{job_info}")

error
end
else
:unchanged ->
Logger.info("GTFS import skipped due to unchanged version #{job_logging_params(job)}")
Logger.info("GTFS import skipped due to unchanged version #{job_info}")

:ok

{:error, reason} = error ->
Logger.warning(
"GTFS import or validation failed #{job_logging_params(job)} reason=#{inspect(reason)}"
)
Logger.warning("GTFS import or validation failed reason=#{inspect(reason)} #{job_info}")

error
end
end

defp job_logging_params(job) do
s3_object_key =
job.args
|> Map.fetch!("s3_uri")
|> URI.parse()
|> then(& &1.path)

archive_version = Map.fetch!(job.args, "archive_version")

"job_id=#{job.id} archive_s3_object_key=#{s3_object_key} archive_version=\"#{archive_version}\" job_worker=#{job.worker}"
end

defp import_transaction(unzip, dry_run?) do
transaction = fn ->
_ = truncate_all()
Expand Down
2 changes: 1 addition & 1 deletion lib/arrow/gtfs/archive.ex
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ defmodule Arrow.Gtfs.Archive do
get_object_op = ExAws.S3.get_object(bucket, object_key)

case apply(mod, fun, [get_object_op]) do
{:ok, %{body: zip_data}} ->
{:ok, %{body: zip_data, status_code: 200}} ->
zip_data
|> List.wrap()
|> from_iodata()
Expand Down
18 changes: 17 additions & 1 deletion lib/arrow/gtfs/job_helper.ex
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ defmodule Arrow.Gtfs.JobHelper do
:all | :queued | :executing | :succeeded | :failed | :cancelled | :not_done | :done

@doc """
Returns details about GTFS import jobs.
Returns details about GTFS import/validation jobs in a JSON-encodable list of maps.
"""
@spec check_jobs(module, status_filter) :: list(map)
def check_jobs(worker_mod, status_filter) do
Expand All @@ -36,6 +36,22 @@ defmodule Arrow.Gtfs.JobHelper do
)
end

@doc """
Returns relevant info about an import/validation job, to be included in a log message.
"""
@spec logging_params(Oban.Job.t()) :: String.t()
def logging_params(job) do
s3_object_key =
job.args
|> Map.fetch!("s3_uri")
|> URI.parse()
|> then(& &1.path)

archive_version = Map.fetch!(job.args, "archive_version")

"job_id=#{job.id} archive_s3_object_key=#{inspect(s3_object_key)} archive_version=#{inspect(archive_version)} job_worker=#{inspect(job.worker)}"
end

defp job_filters do
%{
all: Enum.map(Oban.Job.states(), &Atom.to_string/1),
Expand Down
60 changes: 60 additions & 0 deletions lib/arrow/telemetry.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
defmodule Arrow.Telemetry do
@moduledoc """
Telemetry listeners for Arrow business logic.
"""
require Logger

@spec setup_telemetry() :: :ok
def setup_telemetry do
_ =
:telemetry.attach_many(
"oban",
[[:oban, :job, :start], [:oban, :job, :stop], [:oban, :job, :exception]],
# telemetry prefers event handler to be passed as a non-local function
# capture--i.e., with module name included--for performance reasons.
&Arrow.Telemetry.handle_event/4,
[]
)

:ok
end

def handle_event(event, measures, meta, config)

def handle_event([:oban, :job, :start], _measures, meta, _config) do
Logger.info("Oban job started #{get_job_info(meta.job)}")
end

def handle_event([:oban, :job, :stop], measures, meta, _config) do
Logger.info(
"Oban job stopped #{get_job_info(meta.job)} state=#{meta.state} result=#{inspect(meta.result)} duration=#{measures.duration} memory=#{measures.memory} queue_time=#{measures.queue_time}"
)
end

def handle_event([:oban, :job, :exception], measures, meta, _config) do
details =
case meta.kind do
:error ->
message = Exception.message(meta.reason)
full_details = Exception.format(meta.kind, meta.reason, meta.stacktrace)
"message=#{inspect(message)}\n#{full_details}"

_other ->
"\n#{Exception.format(meta.kind, meta.reason, meta.stacktrace)}"
end

Logger.warning(
"Oban job exception #{get_job_info(meta.job)} state=#{meta.state} result=#{inspect(meta.result)} duration=#{measures.duration} memory=#{measures.memory} queue_time=#{measures.queue_time} #{details}"
)
end

@gtfs_workers [inspect(Arrow.Gtfs.ImportWorker), inspect(Arrow.Gtfs.ValidationWorker)]

defp get_job_info(%Oban.Job{worker: worker} = job) when worker in @gtfs_workers do
Arrow.Gtfs.JobHelper.logging_params(job)
end

defp get_job_info(job) do
"job_id=#{job.id} job_args=#{inspect(job.args)} job_worker=#{inspect(job.worker)}"
end
end
4 changes: 4 additions & 0 deletions lib/arrow_web/telemetry.ex
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
defmodule ArrowWeb.Telemetry do
@moduledoc """
Provides data for the LiveDashboard "metrics" tab.
"""

use Supervisor
import Telemetry.Metrics

Expand Down
36 changes: 36 additions & 0 deletions test/arrow/telemetry_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
defmodule Arrow.ExceptionalWorker do
@moduledoc """
Worker that raises an exception.
"""
use Oban.Worker

@impl Oban.Worker
def perform(%Oban.Job{args: %{"arg" => arg}}) do
raise "argh! arg: #{arg}"
end
end

defmodule Arrow.TelemetryTest do
@moduledoc false
use ExUnit.Case, async: true
use Oban.Testing, repo: Arrow.Repo
import ExUnit.CaptureLog

describe "oban.job.exception listener" do
test "logs exception info" do
log =
capture_log([level: :warning], fn ->
try do
perform_job(Arrow.ExceptionalWorker, %{
arg: "argyle gargoyle"
})
rescue
_ -> nil
end
end)

assert log =~ "Oban job exception"
assert log =~ ~s|message="argh! arg: argyle gargoyle"|
end
end
end
Loading