Skip to content

Commit

Permalink
Test type converter
Browse files Browse the repository at this point in the history
  • Loading branch information
bh2smith committed Dec 26, 2024
1 parent 9b27fc2 commit de095f9
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 5 deletions.
3 changes: 2 additions & 1 deletion config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ jobs:
SELECT
1 AS number,
'\\x1234'::bytea AS my_bytes,
'{"key": "value", "array": [1, 2, 3], "dict": {}}'::json AS my_json
'{"key": "value", "array": [1, 2, 3], "dict": {}}'::json AS my_json,
'[ [{"x": 1}, {"y": 2}], null, [{"z": 3}] ]'::json AS list_dict
destination:
ref: PG
Expand Down
15 changes: 12 additions & 3 deletions src/sources/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,20 @@


def _convert_dict_to_json(df: DataFrame) -> DataFrame:
"""Convert dictionary columns to JSON strings."""
if df.empty:
return df

df = df.copy()
for column in df.columns:
if isinstance(df[column].iloc[0], dict):
df[column] = df[column].apply(json.dumps)
# Get first non-null value to check type
non_null_values = df[column][df[column].notna()]
if len(non_null_values) > 0:
first_value = non_null_values.iloc[0]
if isinstance(first_value, dict | list):
df[column] = df[column].apply(
lambda x: json.dumps(x) if x is not None else None
)

return df


Expand Down
35 changes: 34 additions & 1 deletion tests/unit/sources_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,11 @@
from src.config import RuntimeConfig
from src.interfaces import TypedDataFrame
from src.sources.dune import _reformat_varbinary_columns, dune_result_to_df
from src.sources.postgres import PostgresSource, _convert_bytea_to_hex
from src.sources.postgres import (
PostgresSource,
_convert_bytea_to_hex,
_convert_dict_to_json,
)
from tests import config_root, fixtures_root

INVALID_CONFIG_MESSAGE = (
Expand Down Expand Up @@ -77,6 +81,35 @@ def test_convert_bytea_to_hex(self):
result = _convert_bytea_to_hex(df)
pd.testing.assert_frame_equal(pd.DataFrame([]), result)

def test_convert_dict_to_json(self):
# Test data with dictionary and normal columns
data = {
"dict_col": [{"key": "value"}, {"nested": {"a": 1}}, None],
"normal_col": [1, 2, 3],
"list_dict": [[{"x": 1}, {"y": 2}], None, [{"z": 3}]],
}
df = pd.DataFrame(data)

result = _convert_dict_to_json(df)

# Assert dictionary column was converted to JSON strings
assert result["dict_col"].tolist() == [
'{"key": "value"}',
'{"nested": {"a": 1}}',
None,
]

# Assert list of dictionaries was converted
assert result["list_dict"].tolist() == ['[{"x": 1}, {"y": 2}]', None, '[{"z": 3}]']

# Assert normal column remains unchanged
assert result["normal_col"].tolist() == [1, 2, 3]

# Test empty DataFrame
df_empty = pd.DataFrame([])
result_empty = _convert_dict_to_json(df_empty)
pd.testing.assert_frame_equal(pd.DataFrame([]), result_empty)


class TestPostgresSource(unittest.TestCase):
@classmethod
Expand Down

0 comments on commit de095f9

Please sign in to comment.