Skip to content

Commit

Permalink
HotFix: PG to PG is empty bug (#130)
Browse files Browse the repository at this point in the history
There was an inconsistency with the types returned from postgres and
those being passed in.

PG.fetch returned `DataFrame`
but 
PG.save expected `TypedDataFrame`

Resulted in a bug where data.is_empty did not exist. 

This PR aligns the types naively, but this will all be improved in a
follow up #129 (which assigns proper/non-empty types) to Postgres
dataframes.
  • Loading branch information
bh2smith authored Dec 25, 2024
1 parent f0979a9 commit b130bb7
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 8 deletions.
8 changes: 8 additions & 0 deletions config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,11 @@ jobs:
ref: PG
table_name: cow.solvers
if_exists: replace

- name: p2p-test
source:
ref: PG
query_string: "SELECT 1 as number, '\\x1234'::bytea as my_bytes;"
destination:
ref: PG
table_name: moo.p2p-test
13 changes: 7 additions & 6 deletions src/sources/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from sqlalchemy import create_engine, text
from sqlalchemy.exc import SQLAlchemyError

from src.interfaces import Source
from src.interfaces import Source, TypedDataFrame
from src.logger import log


Expand Down Expand Up @@ -41,7 +41,7 @@ def _convert_bytea_to_hex(df: DataFrame) -> DataFrame:
return df


class PostgresSource(Source[DataFrame]):
class PostgresSource(Source[TypedDataFrame]):
"""Represent PostgreSQL as a data source for retrieving data via SQL queries.
This class connects to a PostgreSQL database using SQLAlchemy and executes a query
Expand Down Expand Up @@ -100,7 +100,7 @@ def validate(self) -> bool:
log.error("Invalid SQL query: %s", str(e))
return False

async def fetch(self) -> DataFrame:
async def fetch(self) -> TypedDataFrame:
"""Execute the SQL query and retrieves the result as a DataFrame.
Returns
Expand All @@ -121,9 +121,10 @@ async def fetch(self) -> DataFrame:
df = await loop.run_in_executor(
None, lambda: pd.read_sql_query(self.query_string, con=self.engine)
)
return _convert_bytea_to_hex(df)
# TODO include types.
return TypedDataFrame(dataframe=_convert_bytea_to_hex(df), types={})

def is_empty(self, data: DataFrame) -> bool:
def is_empty(self, data: TypedDataFrame) -> bool:
"""Check if the provided DataFrame is empty.
Parameters
Expand All @@ -137,7 +138,7 @@ def is_empty(self, data: DataFrame) -> bool:
True if the DataFrame is empty, False otherwise.
"""
return data.empty
return data.is_empty()

def _set_query_string(self, query_string: str) -> None:
"""Set the SQL query string directly or from a file if it ends with '.sql'.
Expand Down
4 changes: 2 additions & 2 deletions tests/unit/sources_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from sqlalchemy.dialects.postgresql import BYTEA

from src.config import RuntimeConfig
from src.interfaces import TypedDataFrame
from src.sources.dune import _reformat_varbinary_columns, dune_result_to_df
from src.sources.postgres import PostgresSource, _convert_bytea_to_hex
from tests import config_root, fixtures_root
Expand Down Expand Up @@ -136,5 +137,4 @@ def test_is_empty(self):
db_url="postgresql://postgres:postgres@localhost:5432/postgres",
query_string="SELECT 1",
)
df = pd.DataFrame([])
self.assertTrue(src.is_empty(df))
self.assertTrue(src.is_empty(TypedDataFrame(pd.DataFrame([]), {})))

0 comments on commit b130bb7

Please sign in to comment.