diff --git a/config.yaml b/config.yaml index 8ceb06f..8e32962 100644 --- a/config.yaml +++ b/config.yaml @@ -52,7 +52,13 @@ jobs: - name: p2p-test source: ref: PG - query_string: "SELECT 1 as number, '\\x1234'::bytea as my_bytes;" + query_string: | + SELECT + 1 AS number, + '\\x1234'::bytea AS my_bytes, + '{"key": "value", "array": [1, 2, 3], "dict": {}}'::json AS my_json, + '[ [{"x": 1}, {"y": 2}], null, [{"z": 3}] ]'::json AS list_dict + destination: ref: PG table_name: moo.p2p-test diff --git a/src/sources/postgres.py b/src/sources/postgres.py index af92922..fbd8d95 100644 --- a/src/sources/postgres.py +++ b/src/sources/postgres.py @@ -1,6 +1,7 @@ """Source logic for PostgreSQL.""" import asyncio +import json from pathlib import Path import pandas as pd @@ -13,6 +14,24 @@ from src.logger import log +def _convert_dict_to_json(df: DataFrame) -> DataFrame: + if df.empty: + return df + + df = df.copy() + for column in df.columns: + # Get first non-null value to check type + non_null_values = df[column][df[column].notna()] + if len(non_null_values) > 0: + first_value = non_null_values.iloc[0] + if isinstance(first_value, dict | list): + df[column] = df[column].apply( + lambda x: json.dumps(x) if x is not None else None + ) + + return df + + def _convert_bytea_to_hex(df: DataFrame) -> DataFrame: """Convert PostgreSQL BYTEA columns to hexadecimal string representation. @@ -121,8 +140,11 @@ async def fetch(self) -> TypedDataFrame: df = await loop.run_in_executor( None, lambda: pd.read_sql_query(self.query_string, con=self.engine) ) + + df = _convert_dict_to_json(df) + df = _convert_bytea_to_hex(df) # TODO include types. - return TypedDataFrame(dataframe=_convert_bytea_to_hex(df), types={}) + return TypedDataFrame(dataframe=df, types={}) def is_empty(self, data: TypedDataFrame) -> bool: """Check if the provided DataFrame is empty. diff --git a/tests/unit/sources_test.py b/tests/unit/sources_test.py index 2dc2bcf..924cbda 100644 --- a/tests/unit/sources_test.py +++ b/tests/unit/sources_test.py @@ -11,7 +11,11 @@ from src.config import RuntimeConfig from src.interfaces import TypedDataFrame from src.sources.dune import _reformat_varbinary_columns, dune_result_to_df -from src.sources.postgres import PostgresSource, _convert_bytea_to_hex +from src.sources.postgres import ( + PostgresSource, + _convert_bytea_to_hex, + _convert_dict_to_json, +) from tests import config_root, fixtures_root INVALID_CONFIG_MESSAGE = ( @@ -77,6 +81,39 @@ def test_convert_bytea_to_hex(self): result = _convert_bytea_to_hex(df) pd.testing.assert_frame_equal(pd.DataFrame([]), result) + def test_convert_dict_to_json(self): + # Test data with dictionary and normal columns + data = { + "dict_col": [{"key": "value"}, {"nested": {"a": 1}}, None], + "normal_col": [1, 2, 3], + "list_dict": [[{"x": 1}, {"y": 2}], None, [{"z": 3}]], + } + df = pd.DataFrame(data) + + result = _convert_dict_to_json(df) + + # Assert dictionary column was converted to JSON strings + assert result["dict_col"].tolist() == [ + '{"key": "value"}', + '{"nested": {"a": 1}}', + None, + ] + + # Assert list of dictionaries was converted + assert result["list_dict"].tolist() == [ + '[{"x": 1}, {"y": 2}]', + None, + '[{"z": 3}]', + ] + + # Assert normal column remains unchanged + assert result["normal_col"].tolist() == [1, 2, 3] + + # Test empty DataFrame + df_empty = pd.DataFrame([]) + result_empty = _convert_dict_to_json(df_empty) + pd.testing.assert_frame_equal(pd.DataFrame([]), result_empty) + class TestPostgresSource(unittest.TestCase): @classmethod