diff --git a/config.yaml b/config.yaml index 8ceb06f..9f0d7a2 100644 --- a/config.yaml +++ b/config.yaml @@ -52,7 +52,12 @@ jobs: - name: p2p-test source: ref: PG - query_string: "SELECT 1 as number, '\\x1234'::bytea as my_bytes;" + query_string: | + SELECT + 1 AS number, + '\\x1234'::bytea AS my_bytes, + '{"key": "value", "array": [1, 2, 3], "dict": {}}'::json AS my_json + destination: ref: PG table_name: moo.p2p-test diff --git a/src/sources/postgres.py b/src/sources/postgres.py index af92922..7f38890 100644 --- a/src/sources/postgres.py +++ b/src/sources/postgres.py @@ -1,6 +1,7 @@ """Source logic for PostgreSQL.""" import asyncio +import json from pathlib import Path import pandas as pd @@ -13,6 +14,15 @@ from src.logger import log +def _convert_dict_to_json(df: DataFrame) -> DataFrame: + """Convert dictionary columns to JSON strings.""" + df = df.copy() + for column in df.columns: + if isinstance(df[column].iloc[0], dict): + df[column] = df[column].apply(json.dumps) + return df + + def _convert_bytea_to_hex(df: DataFrame) -> DataFrame: """Convert PostgreSQL BYTEA columns to hexadecimal string representation. @@ -121,8 +131,11 @@ async def fetch(self) -> TypedDataFrame: df = await loop.run_in_executor( None, lambda: pd.read_sql_query(self.query_string, con=self.engine) ) + + df = _convert_dict_to_json(df) + df = _convert_bytea_to_hex(df) # TODO include types. - return TypedDataFrame(dataframe=_convert_bytea_to_hex(df), types={}) + return TypedDataFrame(dataframe=df, types={}) def is_empty(self, data: TypedDataFrame) -> bool: """Check if the provided DataFrame is empty.