Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify Main Entry Point #29

Merged
merged 4 commits into from
Oct 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file added src/dune_to_local/__init__.py
Empty file.
File renamed without changes.
76 changes: 76 additions & 0 deletions src/dune_to_local/postgres.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
from typing import Any

import pandas as pd
import sqlalchemy
from dune_client.client import DuneClient
from dune_client.models import ExecutionResult
from dune_client.query import QueryBase
from pandas import DataFrame
from sqlalchemy import create_engine

from src.config import DuneToLocalJob, Env
from src.dune_to_local.mappings import DUNE_TO_PG
from src.logger import log

DataTypes = dict[str, Any]


def reformat_varbinary_columns(
df: DataFrame, varbinary_columns: list[str]
) -> DataFrame:
for col in varbinary_columns:
df[col] = df[col].apply(lambda x: bytes.fromhex(x[2:]) if pd.notnull(x) else x)
return df


def dune_result_to_df(result: ExecutionResult) -> tuple[DataFrame, dict[str, type]]:
metadata = result.metadata
dtypes, varbinary_columns = {}, []
for name, d_type in zip(metadata.column_names, metadata.column_types):
dtypes[name] = DUNE_TO_PG[d_type]
if d_type == "varbinary":
varbinary_columns.append(name)

df = pd.DataFrame(result.rows)
# escape bytes
df = reformat_varbinary_columns(df, varbinary_columns)
return df, dtypes


def fetch_dune_data(dune_key: str, job: DuneToLocalJob) -> tuple[DataFrame, DataTypes]:
dune = DuneClient(dune_key, performance=job.query_engine)
response = dune.run_query(
query=QueryBase(
query_id=job.query_id,
params=[
# TODO: https://github.com/bh2smith/dune-sync/issues/30
],
),
ping_frequency=job.poll_frequency,
)
if response.result is None:
raise ValueError("Query execution failed!")
return dune_result_to_df(response.result)


def save_to_postgres(
engine: sqlalchemy.engine.Engine, table_name: str, df: DataFrame, dtypes: DataTypes
) -> None:
if df.empty:
log.warning("DataFrame is empty. Skipping save to PostgreSQL.")
return
with engine.connect() as connection:
df.to_sql(
table_name, connection, if_exists="replace", index=False, dtype=dtypes
)
log.info("Data saved to %s successfully!", table_name)


def dune_to_postgres(env: Env, job: DuneToLocalJob) -> None:
df, types = fetch_dune_data(env.dune_api_key, job)
if not df.empty:
# Skip engine creation if unnecessary.
engine = create_engine(env.db_url)
save_to_postgres(engine, job.table_name, df, types)
else:
log.warning("No Query results found! Skipping write")
62 changes: 2 additions & 60 deletions src/main.py
Original file line number Diff line number Diff line change
@@ -1,71 +1,13 @@
from typing import Any

import pandas as pd
import sqlalchemy
from dune_client.client import DuneClient
from dune_client.models import ExecutionResult
from dune_client.query import QueryBase
from pandas import DataFrame
from sqlalchemy import create_engine

from src.config import Env, RuntimeConfig
from src.logger import log
from src.mappings import DUNE_TO_PG

DataTypes = dict[str, Any]


def reformat_varbinary_columns(
df: DataFrame, varbinary_columns: list[str]
) -> DataFrame:
for col in varbinary_columns:
df[col] = df[col].apply(lambda x: bytes.fromhex(x[2:]) if pd.notnull(x) else x)
return df


def dune_result_to_df(result: ExecutionResult) -> tuple[DataFrame, dict[str, type]]:
metadata = result.metadata
dtypes, varbinary_columns = {}, []
for name, d_type in zip(metadata.column_names, metadata.column_types):
dtypes[name] = DUNE_TO_PG[d_type]
if d_type == "varbinary":
varbinary_columns.append(name)

df = pd.DataFrame(result.rows)
# escape bytes
df = reformat_varbinary_columns(df, varbinary_columns)
return df, dtypes


def fetch_dune_data(
dune: DuneClient, query: QueryBase, ping_frequency: int
) -> tuple[DataFrame, DataTypes]:
result = dune.run_query(query, ping_frequency).result
if result is None:
raise ValueError("Query execution failed!")
return dune_result_to_df(result)


def save_to_postgres(
engine: sqlalchemy.engine.Engine, table_name: str, df: DataFrame, dtypes: DataTypes
) -> None:
df.to_sql(table_name, engine, if_exists="replace", index=False, dtype=dtypes)
log.info("Data saved to %s successfully!", table_name)
from src.dune_to_local.postgres import dune_to_postgres


def main() -> None:
env = Env.load()
config = RuntimeConfig.load_from_toml("config.toml")
# TODO: Async job execution https://github.com/bh2smith/dune-sync/issues/20
for job in config.jobs:
df, types = fetch_dune_data(
dune=DuneClient(env.dune_api_key, performance=job.query_engine),
query=QueryBase(job.query_id),
ping_frequency=job.poll_frequency,
)
if df is not None:
engine = create_engine(env.db_url)
save_to_postgres(engine, job.table_name, df, types)
dune_to_postgres(env, job)


if __name__ == "__main__":
Expand Down
2 changes: 1 addition & 1 deletion tests/e2e_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from sqlalchemy import create_engine
from sqlalchemy.dialects.postgresql import BYTEA

from src.main import save_to_postgres, dune_result_to_df
from src.dune_to_local.postgres import save_to_postgres, dune_result_to_df
from tests.db_util import query_pg

DB_URL = getenv("DB_URL", "postgresql://postgres:postgres@localhost:5432/postgres")
Expand Down