Skip to content

Commit

Permalink
Observability via Prometheus Metrics (#92)
Browse files Browse the repository at this point in the history
Closes #57

---------

Co-authored-by: Benjamin Smith <[email protected]>
  • Loading branch information
mooster531 and bh2smith authored Nov 27, 2024
1 parent d9a7e65 commit 224de6e
Show file tree
Hide file tree
Showing 13 changed files with 229 additions and 11 deletions.
10 changes: 10 additions & 0 deletions .github/workflows/coverage.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,15 @@ jobs:
# Copy repo contents into container (needed to populate DB)
volumes:
- ${{ github.workspace }}:/repo
pushgateway:
image: prom/pushgateway
options: >-
--health-cmd="wget --spider --quiet http://localhost:9091/metrics || exit 1"
--health-interval=10s
--health-timeout=5s
--health-retries=5
ports:
- 9091:9091
steps:
- uses: actions/checkout@v4
- name: Setup Python
Expand All @@ -35,6 +44,7 @@ jobs:
# Environment variables used by the `pg_client.py`
env:
DB_URL: postgresql://postgres:postgres@localhost:5432/postgres
PROMETHEUS_PUSHGATEWAY_URL: http://localhost:9091
- name: Upload coverage report
uses: actions/upload-artifact@v4
with:
Expand Down
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,6 @@ __pycache__/
.mypy_cache
.coverage
coverage.xml

# Metrics
grafana
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ Copy `.env.sample` to `.env` and fill out the two required variables
- `DUNE_API_KEY` - Valid API key for [Dune](https://dune.com/)
- `DB_URL` - Connection string for the source and/or destination PostgreSQL database,
in the form `postgresql://postgres:postgres@localhost:5432/postgres`
- (Optional) `PROMETHEUS_PUSHGATEWAY_URL` - URL of a [Prometheus Pushgateway](https://github.com/prometheus/pushgateway) which receives job-related metrics.

### Mount the config and .env files into the container and run the script

Expand Down
48 changes: 47 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,52 @@ services:
- "5432:5432"
volumes:
- pg_data:/var/lib/postgresql/data
prometheus:
image: prom/prometheus
container_name: prometheus
volumes:
- ./prometheus/:/etc/prometheus/
- prometheus_data:/prometheus
command:
- '--config.file=/etc/prometheus/config.yml'
- '--storage.tsdb.path=/prometheus'
- '--web.console.libraries=/etc/prometheus/console_libraries'
- '--web.console.templates=/etc/prometheus/consoles'
- '--web.enable-lifecycle'
ports:
- "9090:9090"
networks:
- monitor-net

pushgateway:
image: prom/pushgateway
container_name: pushgateway
expose:
- 9091
ports:
- "9091:9091"
networks:
- monitor-net

grafana:
image: grafana/grafana
container_name: grafana
volumes:
- ./grafana/data:/var/lib/grafana
- ./grafana/datasources:/etc/grafana/datasources
- ./grafana/dashboards:/etc/grafana/dashboards
environment:
- GF_SECURITY_ADMIN_USER=${ADMIN_USER:-admin}
- GF_SECURITY_ADMIN_PASSWORD=${ADMIN_PASSWORD:-admin}
- GF_USERS_ALLOW_SIGN_UP=false
ports:
- "3000:3000"
networks:
- monitor-net
user: "1000"
volumes:
pg_data:
pg_data:
prometheus_data: { }
networks:
monitor-net:
driver: bridge
16 changes: 15 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions prometheus/config.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
global:
scrape_interval: 15s
evaluation_interval: 15s

scrape_configs:
- job_name: 'prometheus'
static_configs:
- targets: ['localhost:9090']

- job_name: 'pushgateway'
static_configs:
- targets: ['pushgateway:9091']
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ python-dotenv = "*"
psycopg2-binary = "*"
tomli = "*"
pyyaml = "*"
prometheus-client = "*"

[tool.poetry.dev-dependencies]
black = "*"
Expand Down
8 changes: 7 additions & 1 deletion src/interfaces.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""Interface definitions for the dune-sync package."""

from abc import ABC, abstractmethod
from typing import Any, Generic, TypeVar
from typing import Any, Generic, Protocol, TypeVar

from pandas import DataFrame

Expand All @@ -11,6 +11,12 @@
T = TypeVar("T")


class Named(Protocol):
"""Represents any class with name field."""

name: str


class Validate(ABC):
"""Enforce validation on inheriting classes."""

Expand Down
6 changes: 4 additions & 2 deletions src/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@
from enum import Enum
from typing import Any

from src.interfaces import Destination, Source
from src.interfaces import Destination, Named, Source
from src.logger import log
from src.metrics import collect_metrics


class Database(Enum):
Expand Down Expand Up @@ -44,7 +45,7 @@ def from_string(cls, value: str) -> Database:


@dataclass
class Job:
class Job(Named):
"""Base class for all data synchronization jobs.
A job represents a single data transfer operation from a source
Expand All @@ -61,6 +62,7 @@ class Job:
source: Source[Any]
destination: Destination[Any]

@collect_metrics
async def run(self) -> None:
"""Execute the job by fetching from the source and saving to the destination.
Expand Down
6 changes: 1 addition & 5 deletions src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@

from src.args import Args
from src.config import RuntimeConfig
from src.logger import log


async def main() -> None:
Expand Down Expand Up @@ -54,11 +53,8 @@ async def main() -> None:
)

tasks = [job.run() for job in jobs_to_run]
for job, completed_task in zip(
config.jobs, asyncio.as_completed(tasks), strict=False
):
for completed_task in asyncio.as_completed(tasks):
await completed_task
log.info("Job completed: %s", job)


if __name__ == "__main__":
Expand Down
81 changes: 81 additions & 0 deletions src/metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
"""Handle submitting metrics, logs and other interesting details about jobs."""

import uuid
from collections.abc import Awaitable, Callable, Iterable, Mapping
from functools import wraps
from os import getenv as env
from time import perf_counter
from typing import Any

from prometheus_client import CollectorRegistry, Counter, Gauge, push_to_gateway

from src.interfaces import Named
from src.logger import log


def log_job_metrics(prometheus_url: str, job_metrics: dict[str, Any]) -> None:
"""Log metrics about a job to a prometheus pushgateway."""
registry = CollectorRegistry()
log.info("Pushing metrics to Prometheus")

job_success_timestamp = Gauge(
name="job_last_success_unixtime",
documentation="Unix timestamp of job end",
registry=registry,
)
job_success_timestamp.set_to_current_time()

job_failure_counter = Counter(
name="job_failure_count",
documentation="Number of failed jobs",
registry=registry,
)
job_failure_counter.inc(int(not job_metrics["success"]))

job_duration_metric = Gauge(
name="job_last_success_duration",
documentation="How long did the job take to run (in seconds)",
registry=registry,
)
job_duration_metric.set(job_metrics["duration"])
push_to_gateway(
gateway=prometheus_url,
job=f'dune-sync-{job_metrics["name"]}',
registry=registry,
)


def collect_metrics(
func: Callable[..., Awaitable[Any]],
) -> Callable[..., Awaitable[Any]]:
"""Collect and submit metrics about a Job if a pushgateway is configured."""

@wraps(func)
async def wrapper(
self: Named, *args: Iterable[Any], **kwargs: Mapping[Any, Any]
) -> Any:
if not (prometheus_url := env("PROMETHEUS_PUSHGATEWAY_URL")):
return await func(self, *args, **kwargs)

run_id = uuid.uuid4().hex
start = perf_counter()
success = False

try:
result = await func(self, *args, **kwargs)
success = True
return result
except Exception:
success = False
raise
finally:
duration = perf_counter() - start
metrics = {
"duration": duration,
"name": self.name,
"run_id": run_id,
"success": success,
}
log_job_metrics(prometheus_url, metrics)

return wrapper
27 changes: 26 additions & 1 deletion tests/unit/job_test.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
import unittest
from unittest.mock import AsyncMock, patch

import pytest
from dune_client.query import QueryBase

from src.destinations.postgres import PostgresDestination
from src.job import Database, Job
from src.sources.dune import DuneSource


class DatabaseTests(unittest.TestCase):
class DatabaseTests(unittest.IsolatedAsyncioTestCase):
def test_database_resolution(self):
self.assertEqual(Database.POSTGRES, Database.from_string("postgres"))
self.assertEqual(Database.DUNE, Database.from_string("dune"))
Expand All @@ -25,3 +27,26 @@ def test_job_name_formatting(self):
)
sample_job = Job(name="Move the goats to the pen", source=src, destination=dest)
self.assertEqual("Move the goats to the pen", str(sample_job))

@pytest.mark.asyncio
@patch("src.metrics.push_to_gateway")
async def test_metrics_collection(self, mock_metrics_push, *_):
src = DuneSource(api_key="f00b4r", query=QueryBase(query_id=1234))
dest = PostgresDestination(
db_url="postgresql://postgres:postgres@localhost:5432/postgres",
table_name="some_table",
)
src.fetch = AsyncMock()
dest.save = AsyncMock()
test_job = Job(name="job name", source=src, destination=dest)

with patch("src.metrics.env", return_value=None):
await test_job.run()
mock_metrics_push.assert_not_called()

with patch("src.metrics.env", return_value="http://localhost:9090"):
await test_job.run()
mock_metrics_push.assert_called_once()
call_kwargs = mock_metrics_push.mock_calls[0].kwargs
self.assertEqual("http://localhost:9090", call_kwargs["gateway"])
self.assertEqual("dune-sync-job name", call_kwargs["job"])
21 changes: 21 additions & 0 deletions tests/unit/metrics_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import unittest
from unittest.mock import MagicMock, patch

from src.metrics import log_job_metrics


class TestMetrics(unittest.TestCase):
@patch("src.metrics.push_to_gateway")
def test_log_job_metrics(self, mock_push):
job = MagicMock()
job.name = "mock-job"

log_job_metrics(
"https://localhost:9090",
{"duration": 1, "job": job, "success": False, "name": job.name},
)
self.assertEqual(1, mock_push.call_count)
self.assertEqual(
"https://localhost:9090", mock_push.mock_calls[0].kwargs["gateway"]
)
self.assertEqual("dune-sync-mock-job", mock_push.mock_calls[0].kwargs["job"])

0 comments on commit 224de6e

Please sign in to comment.