Skip to content

Commit

Permalink
Merge branch 'main' into dune-sync-57
Browse files Browse the repository at this point in the history
  • Loading branch information
bh2smith authored Nov 27, 2024
2 parents 125f2c6 + d9a7e65 commit b637d4f
Show file tree
Hide file tree
Showing 18 changed files with 675 additions and 95 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/coverage.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ jobs:
- name: Install Requirements
run: POETRY_VIRTUALENVS_CREATE=false python -m poetry install
- name: Run tests with coverage
run: pytest --cov=src --cov-report=html --cov-fail-under=80
run: pytest --cov=src --cov-report=html --cov-fail-under=90
# Environment variables used by the `pg_client.py`
env:
DB_URL: postgresql://postgres:postgres@localhost:5432/postgres
Expand All @@ -49,4 +49,4 @@ jobs:
uses: actions/upload-artifact@v4
with:
name: coverage-report
path: coverage.xml
path: htmlcov
2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@ clean:

fmt:
black ./
ruff check --fix .

lint:
ruff check .
pylint src/

types:
mypy ${PROJECT_ROOT}/ --strict
Expand Down
13 changes: 12 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,15 @@ The configuration file consists of three main sections:
- `data_sources`: Defines available databases
- `jobs`: Defines synchronization jobs that connect sources to destinations

The config file may contain environment variable placeholders in
[envsubst](https://www.gnu.org/software/gettext/manual/html_node/envsubst-Invocation.html)-compatible format:
- `$VAR_NAME`
- `${VAR_NAME}`
- `$varname`

**Note**: Every variable referenced this way __must__ be defined at runtime,
otherwise the program will exit with an error.

#### Data Source Definitions

Sources are defined as a list of configurations, each containing:
Expand Down Expand Up @@ -90,6 +99,8 @@ docker run --rm \
# - Mount queries directory (if using SQL file paths)
-v "$(pwd)/queries:/app/queries" \
--config /app/my-config.yaml
# - Specify jobs to run (if not specified, all jobs will be run)
--jobs job1 job2
```

Note that postgres queries can also be file paths (they would also need to be mounted into the container).
Expand All @@ -100,7 +111,7 @@ Fill out the empty fields in [Sample Env](.env.sample) (e.g. `DUNE_API_KEY` and

```shell
docker-compose up -d # Starts postgres container (in the background)
python -m src.main --config config.yaml
python -m src.main [--config config.yaml] [--jobs d2p-test-1 p2d-test]
```

### Development Commands
Expand Down
10 changes: 6 additions & 4 deletions config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ data_sources:
key: ${DB_URL}

jobs:
- name: Sync Parameterized Dune Query with Multiple Types to Postgres
- name: d2p-test-1
source:
ref: Dune
query_id: 4238114
Expand All @@ -26,17 +26,19 @@ jobs:
destination:
ref: PG
table_name: results_4238114
if_exists: replace
if_exists: upsert
index_columns:
- hash

- name: Table Independent Query to Dune
- name: p2d-test
source:
ref: PG
query_string: "SELECT 1 as number, '\\x1234'::bytea as my_bytes;"
destination:
ref: Dune
table_name: dune_sync_test_table

- name: Sync Non-parameterized Dune Query with Multiple Types to Postgres
- name: p2d-test-2
source:
ref: Dune
query_id: 4273244
Expand Down
42 changes: 42 additions & 0 deletions src/args.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
"""Command line argument parser for dune-sync application."""

from __future__ import annotations

import argparse
from dataclasses import dataclass
from pathlib import Path

from src import root_path


@dataclass
class Args:
"""Command line argument parser for dune-sync application."""

config: Path
jobs: list[str] | None

@classmethod
def from_command_line(cls) -> Args:
"""Create Args instance from command line arguments."""
parser = argparse.ArgumentParser(
description="Dune Sync - Data synchronization tool"
)
parser.add_argument(
"--config",
type=Path,
default=root_path.parent / "config.yaml",
help="Path to configuration file (default: config.yaml)",
)
parser.add_argument(
"--jobs",
type=str,
nargs="*", # accepts zero or more arguments
default=None,
help="Names of specific jobs to run (default: run all jobs)",
)
args = parser.parse_args()
return cls(
config=args.config,
jobs=args.jobs if args.jobs else None, # Convert empty list to None
)
72 changes: 24 additions & 48 deletions src/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,17 @@
from dataclasses import dataclass
from pathlib import Path
from string import Template
from typing import Any
from typing import Any, TextIO

import yaml
from dotenv import load_dotenv
from dune_client.query import QueryBase
from dune_client.types import ParameterType, QueryParameter

from src.destinations.dune import DuneDestination
from src.destinations.postgres import PostgresDestination
from src.interfaces import Destination, Source
from src.job import Database, Job
from src.sources.dune import DuneSource
from src.sources.dune import DuneSource, parse_query_parameters
from src.sources.postgres import PostgresSource


Expand Down Expand Up @@ -117,47 +116,6 @@ def interpolate(value: str) -> str:
raise KeyError(f"Environment variable '{missing_var}' not found. ") from e


def parse_query_parameters(params: list[dict[str, Any]]) -> list[QueryParameter]:
"""Convert a list of parameter dictionaries into Dune query parameters.
Args:
params (list[dict[str, Any]]): List of parameter dictionaries, each containing:
- name: Parameter name
- type: Parameter type (TEXT, NUMBER, DATE, or ENUM)
- value: Parameter value
Returns:
list[QueryParameter]: List of properly typed Dune query parameters
Raises:
ValueError: If an unknown parameter type is encountered
"""
query_params = []
for param in params:
name = param["name"]
param_type = ParameterType.from_string(param["type"])
value = param["value"]

if param_type == ParameterType.TEXT:
query_params.append(QueryParameter.text_type(name, value))
elif param_type == ParameterType.NUMBER:
query_params.append(QueryParameter.number_type(name, value))
elif param_type == ParameterType.DATE:
query_params.append(QueryParameter.date_type(name, value))
elif param_type == ParameterType.ENUM:
query_params.append(QueryParameter.enum_type(name, value))
else:
# Can't happen.
# this code is actually unreachable because the case it handles
# causes an exception to be thrown earlier, in ParameterType.from_string()
raise ValueError(
f"Unknown parameter type: {param['type']}"
) # pragma: no cover

return query_params


@dataclass
class RuntimeConfig:
"""A class to represent the runtime configuration settings.
Expand All @@ -172,8 +130,25 @@ class RuntimeConfig:

jobs: list[Job]

def __post_init__(self) -> None:
"""Validate that unique job names are used."""
job_names = [job.name for job in self.jobs]
if len(job_names) != len(set(job_names)):
duplicates = {name for name in job_names if job_names.count(name) > 1}
raise ValueError(
f"Duplicate job names found in configuration: {', '.join(duplicates)}"
)

@classmethod
def read_yaml(cls, file_handle: TextIO) -> Any:
"""Load YAML from text, substituting any environment variables referenced."""
Env.load()
text = str(file_handle.read())
text = Env.interpolate(text)
return yaml.safe_load(text)

@classmethod
def load_from_yaml(cls, file_path: Path | str = "config.yaml") -> RuntimeConfig:
def load(cls, file_path: Path | str = "config.yaml") -> RuntimeConfig:
"""Load and parse a YAML configuration file.
Args:
Expand All @@ -188,8 +163,8 @@ def load_from_yaml(cls, file_path: Path | str = "config.yaml") -> RuntimeConfig:
ValueError: If the configuration contains invalid database types
"""
with open(file_path, "rb") as _handle:
data = yaml.safe_load(_handle)
with open(file_path, encoding="utf-8") as _handle:
data = cls.read_yaml(_handle)

# Load data sources map
sources = {}
Expand Down Expand Up @@ -294,6 +269,7 @@ def _build_destination(
return PostgresDestination(
db_url=dest.key,
table_name=dest_config["table_name"],
if_exists=dest_config["if_exists"],
if_exists=dest_config.get("if_exists", "append"),
index_columns=dest_config.get("index_columns", []),
)
raise ValueError(f"Unsupported destination_db type: {dest}")
Loading

0 comments on commit b637d4f

Please sign in to comment.