Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Specific Job Runner as Runtime Argument #100

Merged
merged 6 commits into from
Nov 27, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ clean:

fmt:
black ./
ruff check --fix .

lint:
ruff check .
Expand Down
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ docker run --rm \
# - Mount queries directory (if using SQL file paths)
-v "$(pwd)/queries:/app/queries" \
--config /app/my-config.yaml
# - Specify jobs to run (if not specified, all jobs will be run)
--jobs job1 job2
```

Note that postgres queries can also be file paths (they would also need to be mounted into the container).
Expand All @@ -99,7 +101,7 @@ Fill out the empty fields in [Sample Env](.env.sample) (e.g. `DUNE_API_KEY` and

```shell
docker-compose up -d # Starts postgres container (in the background)
python -m src.main --config config.yaml
python -m src.main [--config config.yaml] [--jobs d2p-test-1 p2d-test]
```

### Development Commands
Expand Down
6 changes: 3 additions & 3 deletions config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ data_sources:
key: ${DB_URL}

jobs:
- name: Sync Parameterized Dune Query with Multiple Types to Postgres
- name: d2p-test-1
source:
ref: Dune
query_id: 4238114
Expand All @@ -30,15 +30,15 @@ jobs:
index_columns:
- hash

- name: Table Independent Query to Dune
- name: p2d-test
source:
ref: PG
query_string: "SELECT 1 as number, '\\x1234'::bytea as my_bytes;"
destination:
ref: Dune
table_name: dune_sync_test_table

- name: Sync Parameterized Dune Query with Multiple Types to Postgres
- name: p2d-test-2
source:
ref: Dune
query_id: 4273244
Expand Down
42 changes: 42 additions & 0 deletions src/args.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
"""Command line argument parser for dune-sync application."""

from __future__ import annotations

import argparse
from dataclasses import dataclass
from pathlib import Path

from src import root_path


@dataclass
class Args:
"""Command line argument parser for dune-sync application."""

config: Path
jobs: list[str] | None

@classmethod
def from_command_line(cls) -> Args:
"""Create Args instance from command line arguments."""
parser = argparse.ArgumentParser(
description="Dune Sync - Data synchronization tool"
)
parser.add_argument(
"--config",
type=Path,
default=root_path.parent / "config.yaml",
help="Path to configuration file (default: config.yaml)",
)
parser.add_argument(
"--jobs",
type=str,
nargs="*", # accepts zero or more arguments
default=None,
help="Names of specific jobs to run (default: run all jobs)",
)
args = parser.parse_args()
return cls(
config=args.config,
jobs=args.jobs if args.jobs else None, # Convert empty list to None
)
11 changes: 10 additions & 1 deletion src/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,15 @@ class RuntimeConfig:

jobs: list[Job]

def __post_init__(self) -> None:
"""Validate that unique job names are used."""
job_names = [job.name for job in self.jobs]
if len(job_names) != len(set(job_names)):
duplicates = {name for name in job_names if job_names.count(name) > 1}
raise ValueError(
f"Duplicate job names found in configuration: {', '.join(duplicates)}"
)

@classmethod
def load_from_yaml(cls, file_path: Path | str = "config.yaml") -> RuntimeConfig:
"""Load and parse a YAML configuration file.
Expand Down Expand Up @@ -294,7 +303,7 @@ def _build_destination(
return PostgresDestination(
db_url=dest.key,
table_name=dest_config["table_name"],
if_exists=dest_config["if_exists"],
if_exists=dest_config.get("if_exists", "append"),
mooster531 marked this conversation as resolved.
Show resolved Hide resolved
index_columns=dest_config.get("index_columns", []),
)
raise ValueError(f"Unsupported destination_db type: {dest}")
26 changes: 10 additions & 16 deletions src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,9 @@
Typically includes database connection strings and API keys.

"""

import argparse
import asyncio
from pathlib import Path

from src import root_path
from src.args import Args
from src.config import RuntimeConfig
from src.logger import log

Expand All @@ -45,20 +42,17 @@ async def main() -> None:
Various exceptions depending on job configuration and execution

"""
parser = argparse.ArgumentParser(
description="Dune Sync - Data synchronization tool"
)
parser.add_argument(
"--config",
type=Path,
default=root_path.parent / "config.yaml",
help="Path to configuration file (default: config.yaml)",
)
args = parser.parse_args()

args = Args.from_command_line()
config = RuntimeConfig.load_from_yaml(args.config)

tasks = [job.run() for job in config.jobs]
# Filter jobs if specific ones were requested
jobs_to_run = (
[job for job in config.jobs if job.name in args.jobs]
if args.jobs is not None
else config.jobs
)

tasks = [job.run() for job in jobs_to_run]
for job, completed_task in zip(
config.jobs, asyncio.as_completed(tasks), strict=False
):
Expand Down
24 changes: 24 additions & 0 deletions tests/fixtures/config/invalid_names.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
---
data_sources:
- name: postgres
type: postgres
key: ${DB_URL}

jobs:
- name: jobName
source:
ref: postgres
table_name: table1
query_string: SELECT 1;
destination:
ref: postgres
table_name: table2

- name: jobName
source:
ref: postgres
table_name: table1
query_string: SELECT 2;
destination:
ref: postgres
table_name: table2
55 changes: 55 additions & 0 deletions tests/unit/args_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
from pathlib import Path
from unittest.mock import patch

from src import root_path
from src.args import Args


def test_args_default_values():
"""Test Args parser with default values (no command line arguments)."""
with patch("sys.argv", ["script.py"]):
args = Args.from_command_line()

assert args.config == root_path.parent / "config.yaml"
assert args.jobs is None


def test_args_custom_config():
"""Test Args parser with custom config path."""
test_config = Path("/custom/path/config.yaml")
with patch("sys.argv", ["script.py", "--config", str(test_config)]):
args = Args.from_command_line()

assert args.config == test_config
assert args.jobs is None


def test_args_with_jobs():
"""Test Args parser with specific jobs."""
with patch("sys.argv", ["script.py", "--jobs", "job1", "job2"]):
args = Args.from_command_line()

assert args.config == root_path.parent / "config.yaml"
assert args.jobs == ["job1", "job2"]


def test_args_with_empty_jobs():
"""Test Args parser with empty jobs list."""
with patch("sys.argv", ["script.py", "--jobs"]):
args = Args.from_command_line()

assert args.config == root_path.parent / "config.yaml"
assert args.jobs is None


def test_args_with_all_options():
"""Test Args parser with both config and jobs specified."""
test_config = Path("/custom/path/config.yaml")
with patch(
"sys.argv",
["script.py", "--config", str(test_config), "--jobs", "job1", "job2"],
):
args = Args.from_command_line()

assert args.config == test_config
assert args.jobs == ["job1", "job2"]
9 changes: 9 additions & 0 deletions tests/unit/config_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,15 @@ def test_load_basic_conf(self):
self.assertEqual(2, len(conf.jobs))
# TODO: come up with more explicit assertions.

def test_load_invalid_names(self):
config_file = config_root / "invalid_names.yaml"
with self.assertRaises(ValueError) as context:
RuntimeConfig.load_from_yaml(config_file.absolute())
self.assertIn(
"Duplicate job names found in configuration: jobName",
str(context.exception),
)

def test_load_unsupported_conf(self):
with self.assertRaises(ValueError) as context:
RuntimeConfig.load_from_yaml(config_root / "unsupported_source.yaml")
Expand Down