From 224de6e63ce521c4c41831fa73d1c48f7d5d787b Mon Sep 17 00:00:00 2001 From: mooster531 <15d1dc293d2e@proton.me> Date: Wed, 27 Nov 2024 21:08:56 +0100 Subject: [PATCH] Observability via Prometheus Metrics (#92) Closes #57 --------- Co-authored-by: Benjamin Smith --- .github/workflows/coverage.yaml | 10 ++++ .gitignore | 3 ++ README.md | 1 + docker-compose.yml | 48 ++++++++++++++++++- poetry.lock | 16 ++++++- prometheus/config.yml | 12 +++++ pyproject.toml | 1 + src/interfaces.py | 8 +++- src/job.py | 6 ++- src/main.py | 6 +-- src/metrics.py | 81 +++++++++++++++++++++++++++++++++ tests/unit/job_test.py | 27 ++++++++++- tests/unit/metrics_test.py | 21 +++++++++ 13 files changed, 229 insertions(+), 11 deletions(-) create mode 100644 prometheus/config.yml create mode 100644 src/metrics.py create mode 100644 tests/unit/metrics_test.py diff --git a/.github/workflows/coverage.yaml b/.github/workflows/coverage.yaml index 51ca0b9..6b61e53 100644 --- a/.github/workflows/coverage.yaml +++ b/.github/workflows/coverage.yaml @@ -20,6 +20,15 @@ jobs: # Copy repo contents into container (needed to populate DB) volumes: - ${{ github.workspace }}:/repo + pushgateway: + image: prom/pushgateway + options: >- + --health-cmd="wget --spider --quiet http://localhost:9091/metrics || exit 1" + --health-interval=10s + --health-timeout=5s + --health-retries=5 + ports: + - 9091:9091 steps: - uses: actions/checkout@v4 - name: Setup Python @@ -35,6 +44,7 @@ jobs: # Environment variables used by the `pg_client.py` env: DB_URL: postgresql://postgres:postgres@localhost:5432/postgres + PROMETHEUS_PUSHGATEWAY_URL: http://localhost:9091 - name: Upload coverage report uses: actions/upload-artifact@v4 with: diff --git a/.gitignore b/.gitignore index 3bc181b..233e0ba 100644 --- a/.gitignore +++ b/.gitignore @@ -10,3 +10,6 @@ __pycache__/ .mypy_cache .coverage coverage.xml + +# Metrics +grafana diff --git a/README.md b/README.md index a448fad..16038e7 100644 --- a/README.md +++ b/README.md @@ -71,6 +71,7 @@ Copy `.env.sample` to `.env` and fill out the two required variables - `DUNE_API_KEY` - Valid API key for [Dune](https://dune.com/) - `DB_URL` - Connection string for the source and/or destination PostgreSQL database, in the form `postgresql://postgres:postgres@localhost:5432/postgres` +- (Optional) `PROMETHEUS_PUSHGATEWAY_URL` - URL of a [Prometheus Pushgateway](https://github.com/prometheus/pushgateway) which receives job-related metrics. ### Mount the config and .env files into the container and run the script diff --git a/docker-compose.yml b/docker-compose.yml index b262f6f..02fbd25 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -10,6 +10,52 @@ services: - "5432:5432" volumes: - pg_data:/var/lib/postgresql/data + prometheus: + image: prom/prometheus + container_name: prometheus + volumes: + - ./prometheus/:/etc/prometheus/ + - prometheus_data:/prometheus + command: + - '--config.file=/etc/prometheus/config.yml' + - '--storage.tsdb.path=/prometheus' + - '--web.console.libraries=/etc/prometheus/console_libraries' + - '--web.console.templates=/etc/prometheus/consoles' + - '--web.enable-lifecycle' + ports: + - "9090:9090" + networks: + - monitor-net + + pushgateway: + image: prom/pushgateway + container_name: pushgateway + expose: + - 9091 + ports: + - "9091:9091" + networks: + - monitor-net + grafana: + image: grafana/grafana + container_name: grafana + volumes: + - ./grafana/data:/var/lib/grafana + - ./grafana/datasources:/etc/grafana/datasources + - ./grafana/dashboards:/etc/grafana/dashboards + environment: + - GF_SECURITY_ADMIN_USER=${ADMIN_USER:-admin} + - GF_SECURITY_ADMIN_PASSWORD=${ADMIN_PASSWORD:-admin} + - GF_USERS_ALLOW_SIGN_UP=false + ports: + - "3000:3000" + networks: + - monitor-net + user: "1000" volumes: - pg_data: \ No newline at end of file + pg_data: + prometheus_data: { } +networks: + monitor-net: + driver: bridge diff --git a/poetry.lock b/poetry.lock index db2b9ac..33ec7bb 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1217,6 +1217,20 @@ nodeenv = ">=0.11.1" pyyaml = ">=5.1" virtualenv = ">=20.10.0" +[[package]] +name = "prometheus-client" +version = "0.21.0" +description = "Python client for the Prometheus monitoring system." +optional = false +python-versions = ">=3.8" +files = [ + {file = "prometheus_client-0.21.0-py3-none-any.whl", hash = "sha256:4fa6b4dd0ac16d58bb587c04b1caae65b8c5043e85f778f42f5f632f6af2e166"}, + {file = "prometheus_client-0.21.0.tar.gz", hash = "sha256:96c83c606b71ff2b0a433c98889d275f51ffec6c5e267de37c7a2b5c9aa9233e"}, +] + +[package.extras] +twisted = ["twisted"] + [[package]] name = "propcache" version = "0.2.0" @@ -2088,4 +2102,4 @@ propcache = ">=0.2.0" [metadata] lock-version = "2.0" python-versions = ">=3.12,<3.14" -content-hash = "ed529465b53e02e490291fc8e44e0825a177c7ed98c70b1a6200e183075dfbe3" +content-hash = "8e35c0f43657930aa73e0bcb3df5fe4cb1a4dba53d1f58088f9133272b1b7510" diff --git a/prometheus/config.yml b/prometheus/config.yml new file mode 100644 index 0000000..61a91da --- /dev/null +++ b/prometheus/config.yml @@ -0,0 +1,12 @@ +global: + scrape_interval: 15s + evaluation_interval: 15s + +scrape_configs: + - job_name: 'prometheus' + static_configs: + - targets: ['localhost:9090'] + + - job_name: 'pushgateway' + static_configs: + - targets: ['pushgateway:9091'] diff --git a/pyproject.toml b/pyproject.toml index 5c37012..14f25d6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -21,6 +21,7 @@ python-dotenv = "*" psycopg2-binary = "*" tomli = "*" pyyaml = "*" +prometheus-client = "*" [tool.poetry.dev-dependencies] black = "*" diff --git a/src/interfaces.py b/src/interfaces.py index e91b0d2..92752eb 100644 --- a/src/interfaces.py +++ b/src/interfaces.py @@ -1,7 +1,7 @@ """Interface definitions for the dune-sync package.""" from abc import ABC, abstractmethod -from typing import Any, Generic, TypeVar +from typing import Any, Generic, Protocol, TypeVar from pandas import DataFrame @@ -11,6 +11,12 @@ T = TypeVar("T") +class Named(Protocol): + """Represents any class with name field.""" + + name: str + + class Validate(ABC): """Enforce validation on inheriting classes.""" diff --git a/src/job.py b/src/job.py index 9cda076..71e6559 100644 --- a/src/job.py +++ b/src/job.py @@ -6,8 +6,9 @@ from enum import Enum from typing import Any -from src.interfaces import Destination, Source +from src.interfaces import Destination, Named, Source from src.logger import log +from src.metrics import collect_metrics class Database(Enum): @@ -44,7 +45,7 @@ def from_string(cls, value: str) -> Database: @dataclass -class Job: +class Job(Named): """Base class for all data synchronization jobs. A job represents a single data transfer operation from a source @@ -61,6 +62,7 @@ class Job: source: Source[Any] destination: Destination[Any] + @collect_metrics async def run(self) -> None: """Execute the job by fetching from the source and saving to the destination. diff --git a/src/main.py b/src/main.py index 4ef1e12..256a3dd 100644 --- a/src/main.py +++ b/src/main.py @@ -25,7 +25,6 @@ from src.args import Args from src.config import RuntimeConfig -from src.logger import log async def main() -> None: @@ -54,11 +53,8 @@ async def main() -> None: ) tasks = [job.run() for job in jobs_to_run] - for job, completed_task in zip( - config.jobs, asyncio.as_completed(tasks), strict=False - ): + for completed_task in asyncio.as_completed(tasks): await completed_task - log.info("Job completed: %s", job) if __name__ == "__main__": diff --git a/src/metrics.py b/src/metrics.py new file mode 100644 index 0000000..8863be1 --- /dev/null +++ b/src/metrics.py @@ -0,0 +1,81 @@ +"""Handle submitting metrics, logs and other interesting details about jobs.""" + +import uuid +from collections.abc import Awaitable, Callable, Iterable, Mapping +from functools import wraps +from os import getenv as env +from time import perf_counter +from typing import Any + +from prometheus_client import CollectorRegistry, Counter, Gauge, push_to_gateway + +from src.interfaces import Named +from src.logger import log + + +def log_job_metrics(prometheus_url: str, job_metrics: dict[str, Any]) -> None: + """Log metrics about a job to a prometheus pushgateway.""" + registry = CollectorRegistry() + log.info("Pushing metrics to Prometheus") + + job_success_timestamp = Gauge( + name="job_last_success_unixtime", + documentation="Unix timestamp of job end", + registry=registry, + ) + job_success_timestamp.set_to_current_time() + + job_failure_counter = Counter( + name="job_failure_count", + documentation="Number of failed jobs", + registry=registry, + ) + job_failure_counter.inc(int(not job_metrics["success"])) + + job_duration_metric = Gauge( + name="job_last_success_duration", + documentation="How long did the job take to run (in seconds)", + registry=registry, + ) + job_duration_metric.set(job_metrics["duration"]) + push_to_gateway( + gateway=prometheus_url, + job=f'dune-sync-{job_metrics["name"]}', + registry=registry, + ) + + +def collect_metrics( + func: Callable[..., Awaitable[Any]], +) -> Callable[..., Awaitable[Any]]: + """Collect and submit metrics about a Job if a pushgateway is configured.""" + + @wraps(func) + async def wrapper( + self: Named, *args: Iterable[Any], **kwargs: Mapping[Any, Any] + ) -> Any: + if not (prometheus_url := env("PROMETHEUS_PUSHGATEWAY_URL")): + return await func(self, *args, **kwargs) + + run_id = uuid.uuid4().hex + start = perf_counter() + success = False + + try: + result = await func(self, *args, **kwargs) + success = True + return result + except Exception: + success = False + raise + finally: + duration = perf_counter() - start + metrics = { + "duration": duration, + "name": self.name, + "run_id": run_id, + "success": success, + } + log_job_metrics(prometheus_url, metrics) + + return wrapper diff --git a/tests/unit/job_test.py b/tests/unit/job_test.py index c249129..1ea39c2 100644 --- a/tests/unit/job_test.py +++ b/tests/unit/job_test.py @@ -1,5 +1,7 @@ import unittest +from unittest.mock import AsyncMock, patch +import pytest from dune_client.query import QueryBase from src.destinations.postgres import PostgresDestination @@ -7,7 +9,7 @@ from src.sources.dune import DuneSource -class DatabaseTests(unittest.TestCase): +class DatabaseTests(unittest.IsolatedAsyncioTestCase): def test_database_resolution(self): self.assertEqual(Database.POSTGRES, Database.from_string("postgres")) self.assertEqual(Database.DUNE, Database.from_string("dune")) @@ -25,3 +27,26 @@ def test_job_name_formatting(self): ) sample_job = Job(name="Move the goats to the pen", source=src, destination=dest) self.assertEqual("Move the goats to the pen", str(sample_job)) + + @pytest.mark.asyncio + @patch("src.metrics.push_to_gateway") + async def test_metrics_collection(self, mock_metrics_push, *_): + src = DuneSource(api_key="f00b4r", query=QueryBase(query_id=1234)) + dest = PostgresDestination( + db_url="postgresql://postgres:postgres@localhost:5432/postgres", + table_name="some_table", + ) + src.fetch = AsyncMock() + dest.save = AsyncMock() + test_job = Job(name="job name", source=src, destination=dest) + + with patch("src.metrics.env", return_value=None): + await test_job.run() + mock_metrics_push.assert_not_called() + + with patch("src.metrics.env", return_value="http://localhost:9090"): + await test_job.run() + mock_metrics_push.assert_called_once() + call_kwargs = mock_metrics_push.mock_calls[0].kwargs + self.assertEqual("http://localhost:9090", call_kwargs["gateway"]) + self.assertEqual("dune-sync-job name", call_kwargs["job"]) diff --git a/tests/unit/metrics_test.py b/tests/unit/metrics_test.py new file mode 100644 index 0000000..f170439 --- /dev/null +++ b/tests/unit/metrics_test.py @@ -0,0 +1,21 @@ +import unittest +from unittest.mock import MagicMock, patch + +from src.metrics import log_job_metrics + + +class TestMetrics(unittest.TestCase): + @patch("src.metrics.push_to_gateway") + def test_log_job_metrics(self, mock_push): + job = MagicMock() + job.name = "mock-job" + + log_job_metrics( + "https://localhost:9090", + {"duration": 1, "job": job, "success": False, "name": job.name}, + ) + self.assertEqual(1, mock_push.call_count) + self.assertEqual( + "https://localhost:9090", mock_push.mock_calls[0].kwargs["gateway"] + ) + self.assertEqual("dune-sync-mock-job", mock_push.mock_calls[0].kwargs["job"])