From b130bb7820815d8a54ed9bb322a5028074b4cfa4 Mon Sep 17 00:00:00 2001 From: Benjamin Smith Date: Wed, 25 Dec 2024 10:01:12 -0500 Subject: [PATCH] HotFix: PG to PG is empty bug (#130) There was an inconsistency with the types returned from postgres and those being passed in. PG.fetch returned `DataFrame` but PG.save expected `TypedDataFrame` Resulted in a bug where data.is_empty did not exist. This PR aligns the types naively, but this will all be improved in a follow up #129 (which assigns proper/non-empty types) to Postgres dataframes. --- config.yaml | 8 ++++++++ src/sources/postgres.py | 13 +++++++------ tests/unit/sources_test.py | 4 ++-- 3 files changed, 17 insertions(+), 8 deletions(-) diff --git a/config.yaml b/config.yaml index 69149cb..8ceb06f 100644 --- a/config.yaml +++ b/config.yaml @@ -48,3 +48,11 @@ jobs: ref: PG table_name: cow.solvers if_exists: replace + + - name: p2p-test + source: + ref: PG + query_string: "SELECT 1 as number, '\\x1234'::bytea as my_bytes;" + destination: + ref: PG + table_name: moo.p2p-test diff --git a/src/sources/postgres.py b/src/sources/postgres.py index 07021f5..af92922 100644 --- a/src/sources/postgres.py +++ b/src/sources/postgres.py @@ -9,7 +9,7 @@ from sqlalchemy import create_engine, text from sqlalchemy.exc import SQLAlchemyError -from src.interfaces import Source +from src.interfaces import Source, TypedDataFrame from src.logger import log @@ -41,7 +41,7 @@ def _convert_bytea_to_hex(df: DataFrame) -> DataFrame: return df -class PostgresSource(Source[DataFrame]): +class PostgresSource(Source[TypedDataFrame]): """Represent PostgreSQL as a data source for retrieving data via SQL queries. This class connects to a PostgreSQL database using SQLAlchemy and executes a query @@ -100,7 +100,7 @@ def validate(self) -> bool: log.error("Invalid SQL query: %s", str(e)) return False - async def fetch(self) -> DataFrame: + async def fetch(self) -> TypedDataFrame: """Execute the SQL query and retrieves the result as a DataFrame. Returns @@ -121,9 +121,10 @@ async def fetch(self) -> DataFrame: df = await loop.run_in_executor( None, lambda: pd.read_sql_query(self.query_string, con=self.engine) ) - return _convert_bytea_to_hex(df) + # TODO include types. + return TypedDataFrame(dataframe=_convert_bytea_to_hex(df), types={}) - def is_empty(self, data: DataFrame) -> bool: + def is_empty(self, data: TypedDataFrame) -> bool: """Check if the provided DataFrame is empty. Parameters @@ -137,7 +138,7 @@ def is_empty(self, data: DataFrame) -> bool: True if the DataFrame is empty, False otherwise. """ - return data.empty + return data.is_empty() def _set_query_string(self, query_string: str) -> None: """Set the SQL query string directly or from a file if it ends with '.sql'. diff --git a/tests/unit/sources_test.py b/tests/unit/sources_test.py index fa639f3..2dc2bcf 100644 --- a/tests/unit/sources_test.py +++ b/tests/unit/sources_test.py @@ -9,6 +9,7 @@ from sqlalchemy.dialects.postgresql import BYTEA from src.config import RuntimeConfig +from src.interfaces import TypedDataFrame from src.sources.dune import _reformat_varbinary_columns, dune_result_to_df from src.sources.postgres import PostgresSource, _convert_bytea_to_hex from tests import config_root, fixtures_root @@ -136,5 +137,4 @@ def test_is_empty(self): db_url="postgresql://postgres:postgres@localhost:5432/postgres", query_string="SELECT 1", ) - df = pd.DataFrame([]) - self.assertTrue(src.is_empty(df)) + self.assertTrue(src.is_empty(TypedDataFrame(pd.DataFrame([]), {})))