diff --git a/src/dune_to_local/__init__.py b/src/dune_to_local/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/mappings.py b/src/dune_to_local/mappings.py similarity index 100% rename from src/mappings.py rename to src/dune_to_local/mappings.py diff --git a/src/dune_to_local/postgres.py b/src/dune_to_local/postgres.py new file mode 100644 index 0000000..9f7fbd6 --- /dev/null +++ b/src/dune_to_local/postgres.py @@ -0,0 +1,76 @@ +from typing import Any + +import pandas as pd +import sqlalchemy +from dune_client.client import DuneClient +from dune_client.models import ExecutionResult +from dune_client.query import QueryBase +from pandas import DataFrame +from sqlalchemy import create_engine + +from src.config import DuneToLocalJob, Env +from src.dune_to_local.mappings import DUNE_TO_PG +from src.logger import log + +DataTypes = dict[str, Any] + + +def reformat_varbinary_columns( + df: DataFrame, varbinary_columns: list[str] +) -> DataFrame: + for col in varbinary_columns: + df[col] = df[col].apply(lambda x: bytes.fromhex(x[2:]) if pd.notnull(x) else x) + return df + + +def dune_result_to_df(result: ExecutionResult) -> tuple[DataFrame, dict[str, type]]: + metadata = result.metadata + dtypes, varbinary_columns = {}, [] + for name, d_type in zip(metadata.column_names, metadata.column_types): + dtypes[name] = DUNE_TO_PG[d_type] + if d_type == "varbinary": + varbinary_columns.append(name) + + df = pd.DataFrame(result.rows) + # escape bytes + df = reformat_varbinary_columns(df, varbinary_columns) + return df, dtypes + + +def fetch_dune_data(dune_key: str, job: DuneToLocalJob) -> tuple[DataFrame, DataTypes]: + dune = DuneClient(dune_key, performance=job.query_engine) + response = dune.run_query( + query=QueryBase( + query_id=job.query_id, + params=[ + # TODO: https://github.com/bh2smith/dune-sync/issues/30 + ], + ), + ping_frequency=job.poll_frequency, + ) + if response.result is None: + raise ValueError("Query execution failed!") + return dune_result_to_df(response.result) + + +def save_to_postgres( + engine: sqlalchemy.engine.Engine, table_name: str, df: DataFrame, dtypes: DataTypes +) -> None: + if df.empty: + log.warning("DataFrame is empty. Skipping save to PostgreSQL.") + return + with engine.connect() as connection: + df.to_sql( + table_name, connection, if_exists="replace", index=False, dtype=dtypes + ) + log.info("Data saved to %s successfully!", table_name) + + +def dune_to_postgres(env: Env, job: DuneToLocalJob) -> None: + df, types = fetch_dune_data(env.dune_api_key, job) + if not df.empty: + # Skip engine creation if unnecessary. + engine = create_engine(env.db_url) + save_to_postgres(engine, job.table_name, df, types) + else: + log.warning("No Query results found! Skipping write") diff --git a/src/main.py b/src/main.py index 63c39eb..57156e6 100644 --- a/src/main.py +++ b/src/main.py @@ -1,56 +1,5 @@ -from typing import Any - -import pandas as pd -import sqlalchemy -from dune_client.client import DuneClient -from dune_client.models import ExecutionResult -from dune_client.query import QueryBase -from pandas import DataFrame -from sqlalchemy import create_engine - from src.config import Env, RuntimeConfig -from src.logger import log -from src.mappings import DUNE_TO_PG - -DataTypes = dict[str, Any] - - -def reformat_varbinary_columns( - df: DataFrame, varbinary_columns: list[str] -) -> DataFrame: - for col in varbinary_columns: - df[col] = df[col].apply(lambda x: bytes.fromhex(x[2:]) if pd.notnull(x) else x) - return df - - -def dune_result_to_df(result: ExecutionResult) -> tuple[DataFrame, dict[str, type]]: - metadata = result.metadata - dtypes, varbinary_columns = {}, [] - for name, d_type in zip(metadata.column_names, metadata.column_types): - dtypes[name] = DUNE_TO_PG[d_type] - if d_type == "varbinary": - varbinary_columns.append(name) - - df = pd.DataFrame(result.rows) - # escape bytes - df = reformat_varbinary_columns(df, varbinary_columns) - return df, dtypes - - -def fetch_dune_data( - dune: DuneClient, query: QueryBase, ping_frequency: int -) -> tuple[DataFrame, DataTypes]: - result = dune.run_query(query, ping_frequency).result - if result is None: - raise ValueError("Query execution failed!") - return dune_result_to_df(result) - - -def save_to_postgres( - engine: sqlalchemy.engine.Engine, table_name: str, df: DataFrame, dtypes: DataTypes -) -> None: - df.to_sql(table_name, engine, if_exists="replace", index=False, dtype=dtypes) - log.info("Data saved to %s successfully!", table_name) +from src.dune_to_local.postgres import dune_to_postgres def main() -> None: @@ -58,14 +7,7 @@ def main() -> None: config = RuntimeConfig.load_from_toml("config.toml") # TODO: Async job execution https://github.com/bh2smith/dune-sync/issues/20 for job in config.jobs: - df, types = fetch_dune_data( - dune=DuneClient(env.dune_api_key, performance=job.query_engine), - query=QueryBase(job.query_id), - ping_frequency=job.poll_frequency, - ) - if df is not None: - engine = create_engine(env.db_url) - save_to_postgres(engine, job.table_name, df, types) + dune_to_postgres(env, job) if __name__ == "__main__": diff --git a/tests/e2e_test.py b/tests/e2e_test.py index 2dffa59..a7b645b 100644 --- a/tests/e2e_test.py +++ b/tests/e2e_test.py @@ -9,7 +9,7 @@ from sqlalchemy import create_engine from sqlalchemy.dialects.postgresql import BYTEA -from src.main import save_to_postgres, dune_result_to_df +from src.dune_to_local.postgres import save_to_postgres, dune_result_to_df from tests.db_util import query_pg DB_URL = getenv("DB_URL", "postgresql://postgres:postgres@localhost:5432/postgres")