Skip to content

Commit

Permalink
Simplify Main Entry Point (#29)
Browse files Browse the repository at this point in the history
No logical changes have been made here.

Except now there is a new function `def dune_to_postgres(env: Env, job:
DuneToLocalJob) -> None:`

which is called from the main entry point.

I am not terribly fond of the new file name `dune_to_local/main.py` -
open to suggestions.
  • Loading branch information
bh2smith authored Oct 26, 2024
1 parent 01a2802 commit 7075728
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 61 deletions.
Empty file added src/dune_to_local/__init__.py
Empty file.
File renamed without changes.
76 changes: 76 additions & 0 deletions src/dune_to_local/postgres.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
from typing import Any

import pandas as pd
import sqlalchemy
from dune_client.client import DuneClient
from dune_client.models import ExecutionResult
from dune_client.query import QueryBase
from pandas import DataFrame
from sqlalchemy import create_engine

from src.config import DuneToLocalJob, Env
from src.dune_to_local.mappings import DUNE_TO_PG
from src.logger import log

DataTypes = dict[str, Any]


def reformat_varbinary_columns(
df: DataFrame, varbinary_columns: list[str]
) -> DataFrame:
for col in varbinary_columns:
df[col] = df[col].apply(lambda x: bytes.fromhex(x[2:]) if pd.notnull(x) else x)
return df


def dune_result_to_df(result: ExecutionResult) -> tuple[DataFrame, dict[str, type]]:
metadata = result.metadata
dtypes, varbinary_columns = {}, []
for name, d_type in zip(metadata.column_names, metadata.column_types):
dtypes[name] = DUNE_TO_PG[d_type]
if d_type == "varbinary":
varbinary_columns.append(name)

df = pd.DataFrame(result.rows)
# escape bytes
df = reformat_varbinary_columns(df, varbinary_columns)
return df, dtypes


def fetch_dune_data(dune_key: str, job: DuneToLocalJob) -> tuple[DataFrame, DataTypes]:
dune = DuneClient(dune_key, performance=job.query_engine)
response = dune.run_query(
query=QueryBase(
query_id=job.query_id,
params=[
# TODO: https://github.com/bh2smith/dune-sync/issues/30
],
),
ping_frequency=job.poll_frequency,
)
if response.result is None:
raise ValueError("Query execution failed!")
return dune_result_to_df(response.result)


def save_to_postgres(
engine: sqlalchemy.engine.Engine, table_name: str, df: DataFrame, dtypes: DataTypes
) -> None:
if df.empty:
log.warning("DataFrame is empty. Skipping save to PostgreSQL.")
return
with engine.connect() as connection:
df.to_sql(
table_name, connection, if_exists="replace", index=False, dtype=dtypes
)
log.info("Data saved to %s successfully!", table_name)


def dune_to_postgres(env: Env, job: DuneToLocalJob) -> None:
df, types = fetch_dune_data(env.dune_api_key, job)
if not df.empty:
# Skip engine creation if unnecessary.
engine = create_engine(env.db_url)
save_to_postgres(engine, job.table_name, df, types)
else:
log.warning("No Query results found! Skipping write")
62 changes: 2 additions & 60 deletions src/main.py
Original file line number Diff line number Diff line change
@@ -1,71 +1,13 @@
from typing import Any

import pandas as pd
import sqlalchemy
from dune_client.client import DuneClient
from dune_client.models import ExecutionResult
from dune_client.query import QueryBase
from pandas import DataFrame
from sqlalchemy import create_engine

from src.config import Env, RuntimeConfig
from src.logger import log
from src.mappings import DUNE_TO_PG

DataTypes = dict[str, Any]


def reformat_varbinary_columns(
df: DataFrame, varbinary_columns: list[str]
) -> DataFrame:
for col in varbinary_columns:
df[col] = df[col].apply(lambda x: bytes.fromhex(x[2:]) if pd.notnull(x) else x)
return df


def dune_result_to_df(result: ExecutionResult) -> tuple[DataFrame, dict[str, type]]:
metadata = result.metadata
dtypes, varbinary_columns = {}, []
for name, d_type in zip(metadata.column_names, metadata.column_types):
dtypes[name] = DUNE_TO_PG[d_type]
if d_type == "varbinary":
varbinary_columns.append(name)

df = pd.DataFrame(result.rows)
# escape bytes
df = reformat_varbinary_columns(df, varbinary_columns)
return df, dtypes


def fetch_dune_data(
dune: DuneClient, query: QueryBase, ping_frequency: int
) -> tuple[DataFrame, DataTypes]:
result = dune.run_query(query, ping_frequency).result
if result is None:
raise ValueError("Query execution failed!")
return dune_result_to_df(result)


def save_to_postgres(
engine: sqlalchemy.engine.Engine, table_name: str, df: DataFrame, dtypes: DataTypes
) -> None:
df.to_sql(table_name, engine, if_exists="replace", index=False, dtype=dtypes)
log.info("Data saved to %s successfully!", table_name)
from src.dune_to_local.postgres import dune_to_postgres


def main() -> None:
env = Env.load()
config = RuntimeConfig.load_from_toml("config.toml")
# TODO: Async job execution https://github.com/bh2smith/dune-sync/issues/20
for job in config.jobs:
df, types = fetch_dune_data(
dune=DuneClient(env.dune_api_key, performance=job.query_engine),
query=QueryBase(job.query_id),
ping_frequency=job.poll_frequency,
)
if df is not None:
engine = create_engine(env.db_url)
save_to_postgres(engine, job.table_name, df, types)
dune_to_postgres(env, job)


if __name__ == "__main__":
Expand Down
2 changes: 1 addition & 1 deletion tests/e2e_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from sqlalchemy import create_engine
from sqlalchemy.dialects.postgresql import BYTEA

from src.main import save_to_postgres, dune_result_to_df
from src.dune_to_local.postgres import save_to_postgres, dune_result_to_df
from tests.db_util import query_pg

DB_URL = getenv("DB_URL", "postgresql://postgres:postgres@localhost:5432/postgres")
Expand Down

0 comments on commit 7075728

Please sign in to comment.