From 8b138e326cd7ddecd436077a3451c66afb324a08 Mon Sep 17 00:00:00 2001 From: Benjamin Smith Date: Sat, 26 Oct 2024 20:23:30 +0200 Subject: [PATCH 1/4] restructure project and simplify main entry point --- src/dune_to_local/__init__.py | 0 src/dune_to_local/main.py | 64 +++++++++++++++++++++++++++++ src/{ => dune_to_local}/mappings.py | 0 src/main.py | 62 +--------------------------- tests/e2e_test.py | 2 +- 5 files changed, 67 insertions(+), 61 deletions(-) create mode 100644 src/dune_to_local/__init__.py create mode 100644 src/dune_to_local/main.py rename src/{ => dune_to_local}/mappings.py (100%) diff --git a/src/dune_to_local/__init__.py b/src/dune_to_local/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/dune_to_local/main.py b/src/dune_to_local/main.py new file mode 100644 index 0000000..a0b03b6 --- /dev/null +++ b/src/dune_to_local/main.py @@ -0,0 +1,64 @@ +from typing import Any + +import pandas as pd +import sqlalchemy +from dune_client.client import DuneClient +from dune_client.models import ExecutionResult +from dune_client.query import QueryBase +from pandas import DataFrame +from sqlalchemy import create_engine + +from src.config import DuneToLocalJob, Env +from src.dune_to_local.mappings import DUNE_TO_PG +from src.logger import log + +DataTypes = dict[str, Any] + + +def reformat_varbinary_columns( + df: DataFrame, varbinary_columns: list[str] +) -> DataFrame: + for col in varbinary_columns: + df[col] = df[col].apply(lambda x: bytes.fromhex(x[2:]) if pd.notnull(x) else x) + return df + + +def dune_result_to_df(result: ExecutionResult) -> tuple[DataFrame, dict[str, type]]: + metadata = result.metadata + dtypes, varbinary_columns = {}, [] + for name, d_type in zip(metadata.column_names, metadata.column_types): + dtypes[name] = DUNE_TO_PG[d_type] + if d_type == "varbinary": + varbinary_columns.append(name) + + df = pd.DataFrame(result.rows) + # escape bytes + df = reformat_varbinary_columns(df, varbinary_columns) + return df, dtypes + + +def fetch_dune_data(dune_key: str, job: DuneToLocalJob) -> tuple[DataFrame, DataTypes]: + result = ( + DuneClient(dune_key, performance=job.query_engine) + .run_query(query=QueryBase(job.query_id), ping_frequency=job.poll_frequency) + .result + ) + if result is None: + raise ValueError("Query execution failed!") + return dune_result_to_df(result) + + +def save_to_postgres( + engine: sqlalchemy.engine.Engine, table_name: str, df: DataFrame, dtypes: DataTypes +) -> None: + df.to_sql(table_name, engine, if_exists="replace", index=False, dtype=dtypes) + log.info("Data saved to %s successfully!", table_name) + + +def dune_to_postgres(env: Env, job: DuneToLocalJob) -> None: + df, types = fetch_dune_data(env.dune_api_key, job) + if df is not None: + engine = create_engine(env.db_url) + save_to_postgres(engine, job.table_name, df, types) + else: + log.warning("No Query results found! Skipping write") diff --git a/src/mappings.py b/src/dune_to_local/mappings.py similarity index 100% rename from src/mappings.py rename to src/dune_to_local/mappings.py diff --git a/src/main.py b/src/main.py index 63c39eb..00ebd5d 100644 --- a/src/main.py +++ b/src/main.py @@ -1,56 +1,5 @@ -from typing import Any - -import pandas as pd -import sqlalchemy -from dune_client.client import DuneClient -from dune_client.models import ExecutionResult -from dune_client.query import QueryBase -from pandas import DataFrame -from sqlalchemy import create_engine - from src.config import Env, RuntimeConfig -from src.logger import log -from src.mappings import DUNE_TO_PG - -DataTypes = dict[str, Any] - - -def reformat_varbinary_columns( - df: DataFrame, varbinary_columns: list[str] -) -> DataFrame: - for col in varbinary_columns: - df[col] = df[col].apply(lambda x: bytes.fromhex(x[2:]) if pd.notnull(x) else x) - return df - - -def dune_result_to_df(result: ExecutionResult) -> tuple[DataFrame, dict[str, type]]: - metadata = result.metadata - dtypes, varbinary_columns = {}, [] - for name, d_type in zip(metadata.column_names, metadata.column_types): - dtypes[name] = DUNE_TO_PG[d_type] - if d_type == "varbinary": - varbinary_columns.append(name) - - df = pd.DataFrame(result.rows) - # escape bytes - df = reformat_varbinary_columns(df, varbinary_columns) - return df, dtypes - - -def fetch_dune_data( - dune: DuneClient, query: QueryBase, ping_frequency: int -) -> tuple[DataFrame, DataTypes]: - result = dune.run_query(query, ping_frequency).result - if result is None: - raise ValueError("Query execution failed!") - return dune_result_to_df(result) - - -def save_to_postgres( - engine: sqlalchemy.engine.Engine, table_name: str, df: DataFrame, dtypes: DataTypes -) -> None: - df.to_sql(table_name, engine, if_exists="replace", index=False, dtype=dtypes) - log.info("Data saved to %s successfully!", table_name) +from src.dune_to_local.main import dune_to_postgres def main() -> None: @@ -58,14 +7,7 @@ def main() -> None: config = RuntimeConfig.load_from_toml("config.toml") # TODO: Async job execution https://github.com/bh2smith/dune-sync/issues/20 for job in config.jobs: - df, types = fetch_dune_data( - dune=DuneClient(env.dune_api_key, performance=job.query_engine), - query=QueryBase(job.query_id), - ping_frequency=job.poll_frequency, - ) - if df is not None: - engine = create_engine(env.db_url) - save_to_postgres(engine, job.table_name, df, types) + dune_to_postgres(env, job) if __name__ == "__main__": diff --git a/tests/e2e_test.py b/tests/e2e_test.py index 2dffa59..540a678 100644 --- a/tests/e2e_test.py +++ b/tests/e2e_test.py @@ -9,7 +9,7 @@ from sqlalchemy import create_engine from sqlalchemy.dialects.postgresql import BYTEA -from src.main import save_to_postgres, dune_result_to_df +from src.dune_to_local.main import save_to_postgres, dune_result_to_df from tests.db_util import query_pg DB_URL = getenv("DB_URL", "postgresql://postgres:postgres@localhost:5432/postgres") From 2c2832919057f3f26fd5cabaa469d1088534fe31 Mon Sep 17 00:00:00 2001 From: Benjamin Smith Date: Sat, 26 Oct 2024 20:37:26 +0200 Subject: [PATCH 2/4] add TODO --- src/dune_to_local/main.py | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/src/dune_to_local/main.py b/src/dune_to_local/main.py index a0b03b6..daee749 100644 --- a/src/dune_to_local/main.py +++ b/src/dune_to_local/main.py @@ -38,14 +38,19 @@ def dune_result_to_df(result: ExecutionResult) -> tuple[DataFrame, dict[str, typ def fetch_dune_data(dune_key: str, job: DuneToLocalJob) -> tuple[DataFrame, DataTypes]: - result = ( - DuneClient(dune_key, performance=job.query_engine) - .run_query(query=QueryBase(job.query_id), ping_frequency=job.poll_frequency) - .result + dune = DuneClient(dune_key, performance=job.query_engine) + response = dune.run_query( + query=QueryBase( + query_id=job.query_id, + params=[ + # TODO: https://github.com/bh2smith/dune-sync/issues/30 + ], + ), + ping_frequency=job.poll_frequency, ) - if result is None: + if response.result is None: raise ValueError("Query execution failed!") - return dune_result_to_df(result) + return dune_result_to_df(response.result) def save_to_postgres( From ac92f43a526db27393a76b396edf1744384ba514 Mon Sep 17 00:00:00 2001 From: Benjamin Smith Date: Sat, 26 Oct 2024 20:40:31 +0200 Subject: [PATCH 3/4] minor cleanup --- src/dune_to_local/main.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/src/dune_to_local/main.py b/src/dune_to_local/main.py index daee749..9f7fbd6 100644 --- a/src/dune_to_local/main.py +++ b/src/dune_to_local/main.py @@ -56,13 +56,20 @@ def fetch_dune_data(dune_key: str, job: DuneToLocalJob) -> tuple[DataFrame, Data def save_to_postgres( engine: sqlalchemy.engine.Engine, table_name: str, df: DataFrame, dtypes: DataTypes ) -> None: - df.to_sql(table_name, engine, if_exists="replace", index=False, dtype=dtypes) + if df.empty: + log.warning("DataFrame is empty. Skipping save to PostgreSQL.") + return + with engine.connect() as connection: + df.to_sql( + table_name, connection, if_exists="replace", index=False, dtype=dtypes + ) log.info("Data saved to %s successfully!", table_name) def dune_to_postgres(env: Env, job: DuneToLocalJob) -> None: df, types = fetch_dune_data(env.dune_api_key, job) - if df is not None: + if not df.empty: + # Skip engine creation if unnecessary. engine = create_engine(env.db_url) save_to_postgres(engine, job.table_name, df, types) else: From f943bb94b8efcfb3f72c6290c4ec97db938380f9 Mon Sep 17 00:00:00 2001 From: Benjamin Smith Date: Sat, 26 Oct 2024 21:46:50 +0200 Subject: [PATCH 4/4] file rename --- src/dune_to_local/{main.py => postgres.py} | 0 src/main.py | 2 +- tests/e2e_test.py | 2 +- 3 files changed, 2 insertions(+), 2 deletions(-) rename src/dune_to_local/{main.py => postgres.py} (100%) diff --git a/src/dune_to_local/main.py b/src/dune_to_local/postgres.py similarity index 100% rename from src/dune_to_local/main.py rename to src/dune_to_local/postgres.py diff --git a/src/main.py b/src/main.py index 00ebd5d..57156e6 100644 --- a/src/main.py +++ b/src/main.py @@ -1,5 +1,5 @@ from src.config import Env, RuntimeConfig -from src.dune_to_local.main import dune_to_postgres +from src.dune_to_local.postgres import dune_to_postgres def main() -> None: diff --git a/tests/e2e_test.py b/tests/e2e_test.py index 540a678..a7b645b 100644 --- a/tests/e2e_test.py +++ b/tests/e2e_test.py @@ -9,7 +9,7 @@ from sqlalchemy import create_engine from sqlalchemy.dialects.postgresql import BYTEA -from src.dune_to_local.main import save_to_postgres, dune_result_to_df +from src.dune_to_local.postgres import save_to_postgres, dune_result_to_df from tests.db_util import query_pg DB_URL = getenv("DB_URL", "postgresql://postgres:postgres@localhost:5432/postgres")