Skip to content

Commit

Permalink
PG Dict to JSON
Browse files Browse the repository at this point in the history
  • Loading branch information
bh2smith committed Dec 26, 2024
1 parent b130bb7 commit 9b27fc2
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 2 deletions.
7 changes: 6 additions & 1 deletion config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,12 @@ jobs:
- name: p2p-test
source:
ref: PG
query_string: "SELECT 1 as number, '\\x1234'::bytea as my_bytes;"
query_string: |
SELECT
1 AS number,
'\\x1234'::bytea AS my_bytes,
'{"key": "value", "array": [1, 2, 3], "dict": {}}'::json AS my_json
destination:
ref: PG
table_name: moo.p2p-test
15 changes: 14 additions & 1 deletion src/sources/postgres.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Source logic for PostgreSQL."""

import asyncio
import json
from pathlib import Path

import pandas as pd
Expand All @@ -13,6 +14,15 @@
from src.logger import log


def _convert_dict_to_json(df: DataFrame) -> DataFrame:
"""Convert dictionary columns to JSON strings."""
df = df.copy()
for column in df.columns:
if isinstance(df[column].iloc[0], dict):
df[column] = df[column].apply(json.dumps)
return df


def _convert_bytea_to_hex(df: DataFrame) -> DataFrame:
"""Convert PostgreSQL BYTEA columns to hexadecimal string representation.
Expand Down Expand Up @@ -121,8 +131,11 @@ async def fetch(self) -> TypedDataFrame:
df = await loop.run_in_executor(
None, lambda: pd.read_sql_query(self.query_string, con=self.engine)
)

df = _convert_dict_to_json(df)
df = _convert_bytea_to_hex(df)
# TODO include types.
return TypedDataFrame(dataframe=_convert_bytea_to_hex(df), types={})
return TypedDataFrame(dataframe=df, types={})

def is_empty(self, data: TypedDataFrame) -> bool:
"""Check if the provided DataFrame is empty.
Expand Down

0 comments on commit 9b27fc2

Please sign in to comment.