Skip to content

Commit

Permalink
PG Dict to JSON (#132)
Browse files Browse the repository at this point in the history
After PG fetch, we must convert dict types to JSON strings. Also preemptively converting array types.

Closes #131
  • Loading branch information
bh2smith authored Dec 26, 2024
1 parent b130bb7 commit 945e327
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 3 deletions.
8 changes: 7 additions & 1 deletion config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,13 @@ jobs:
- name: p2p-test
source:
ref: PG
query_string: "SELECT 1 as number, '\\x1234'::bytea as my_bytes;"
query_string: |
SELECT
1 AS number,
'\\x1234'::bytea AS my_bytes,
'{"key": "value", "array": [1, 2, 3], "dict": {}}'::json AS my_json,
'[ [{"x": 1}, {"y": 2}], null, [{"z": 3}] ]'::json AS list_dict
destination:
ref: PG
table_name: moo.p2p-test
24 changes: 23 additions & 1 deletion src/sources/postgres.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Source logic for PostgreSQL."""

import asyncio
import json
from pathlib import Path

import pandas as pd
Expand All @@ -13,6 +14,24 @@
from src.logger import log


def _convert_dict_to_json(df: DataFrame) -> DataFrame:
if df.empty:
return df

df = df.copy()
for column in df.columns:
# Get first non-null value to check type
non_null_values = df[column][df[column].notna()]
if len(non_null_values) > 0:
first_value = non_null_values.iloc[0]
if isinstance(first_value, dict | list):
df[column] = df[column].apply(
lambda x: json.dumps(x) if x is not None else None
)

return df


def _convert_bytea_to_hex(df: DataFrame) -> DataFrame:
"""Convert PostgreSQL BYTEA columns to hexadecimal string representation.
Expand Down Expand Up @@ -121,8 +140,11 @@ async def fetch(self) -> TypedDataFrame:
df = await loop.run_in_executor(
None, lambda: pd.read_sql_query(self.query_string, con=self.engine)
)

df = _convert_dict_to_json(df)
df = _convert_bytea_to_hex(df)
# TODO include types.
return TypedDataFrame(dataframe=_convert_bytea_to_hex(df), types={})
return TypedDataFrame(dataframe=df, types={})

def is_empty(self, data: TypedDataFrame) -> bool:
"""Check if the provided DataFrame is empty.
Expand Down
39 changes: 38 additions & 1 deletion tests/unit/sources_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,11 @@
from src.config import RuntimeConfig
from src.interfaces import TypedDataFrame
from src.sources.dune import _reformat_varbinary_columns, dune_result_to_df
from src.sources.postgres import PostgresSource, _convert_bytea_to_hex
from src.sources.postgres import (
PostgresSource,
_convert_bytea_to_hex,
_convert_dict_to_json,
)
from tests import config_root, fixtures_root

INVALID_CONFIG_MESSAGE = (
Expand Down Expand Up @@ -77,6 +81,39 @@ def test_convert_bytea_to_hex(self):
result = _convert_bytea_to_hex(df)
pd.testing.assert_frame_equal(pd.DataFrame([]), result)

def test_convert_dict_to_json(self):
# Test data with dictionary and normal columns
data = {
"dict_col": [{"key": "value"}, {"nested": {"a": 1}}, None],
"normal_col": [1, 2, 3],
"list_dict": [[{"x": 1}, {"y": 2}], None, [{"z": 3}]],
}
df = pd.DataFrame(data)

result = _convert_dict_to_json(df)

# Assert dictionary column was converted to JSON strings
assert result["dict_col"].tolist() == [
'{"key": "value"}',
'{"nested": {"a": 1}}',
None,
]

# Assert list of dictionaries was converted
assert result["list_dict"].tolist() == [
'[{"x": 1}, {"y": 2}]',
None,
'[{"z": 3}]',
]

# Assert normal column remains unchanged
assert result["normal_col"].tolist() == [1, 2, 3]

# Test empty DataFrame
df_empty = pd.DataFrame([])
result_empty = _convert_dict_to_json(df_empty)
pd.testing.assert_frame_equal(pd.DataFrame([]), result_empty)


class TestPostgresSource(unittest.TestCase):
@classmethod
Expand Down

0 comments on commit 945e327

Please sign in to comment.