diff --git a/config.yaml b/config.yaml index 946e9f2..64f5047 100644 --- a/config.yaml +++ b/config.yaml @@ -33,11 +33,16 @@ jobs: - name: p2d-test source: ref: PG - query_string: "SELECT 1 as number, '\\x1234'::bytea as my_bytes;" + query_string: | + SELECT + 1 AS number, + '\\x1234'::bytea AS my_bytes, + '{"key": "value", "array": [1, 2, 3], "dict": {}}'::json AS my_json, + '[ [{"x": 1}, {"y": 2}], null, [{"z": 3}] ]'::json AS list_dict destination: ref: Dune table_name: bh2smith.dune_sync_test - if_exists: append + insertion_type: replace - name: cow-solvers source: diff --git a/src/config.py b/src/config.py index 0be9dad..788c250 100644 --- a/src/config.py +++ b/src/config.py @@ -305,8 +305,6 @@ def _build_destination( f' "{dest_config["ref"]}" defined in config' ) from e - if_exists = dest_config.get("if_exists", "append") - match dest.type: case Database.DUNE: try: @@ -325,14 +323,14 @@ def _build_destination( api_key=dest.key, table_name=dest_config["table_name"], request_timeout=request_timeout, - if_exists=if_exists, + insertion_type=dest_config.get("insertion_type", "append"), ) case Database.POSTGRES: return PostgresDestination( db_url=dest.key, table_name=dest_config["table_name"], - if_exists=if_exists, + if_exists=dest_config.get("if_exists", "append"), index_columns=dest_config.get("index_columns", []), ) raise ValueError(f"Unsupported destination_db type: {dest}") diff --git a/src/destinations/common.py b/src/destinations/common.py index 314882a..ed89f80 100644 --- a/src/destinations/common.py +++ b/src/destinations/common.py @@ -1,5 +1 @@ """Common structures used in multiple destination implementations.""" - -from typing import Literal - -TableExistsPolicy = Literal["append", "replace", "upsert", "insert_ignore"] diff --git a/src/destinations/dune.py b/src/destinations/dune.py index ef44cde..6be4ce9 100644 --- a/src/destinations/dune.py +++ b/src/destinations/dune.py @@ -1,14 +1,19 @@ """Destination logic for Dune Analytics.""" import io +from typing import Literal from dune_client.client import DuneClient from dune_client.models import DuneError, QueryFailed +from dune_client.query import QueryBase +from dune_client.types import QueryParameter +from pandas import DataFrame -from src.destinations.common import TableExistsPolicy from src.interfaces import Destination, TypedDataFrame from src.logger import log +InsertionPolicy = Literal["append", "replace", "upload_csv"] + class DuneDestination(Destination[TypedDataFrame]): """A class representing as Dune as a destination. @@ -34,20 +39,14 @@ def __init__( api_key: str, table_name: str, request_timeout: int, - if_exists: TableExistsPolicy = "append", + insertion_type: InsertionPolicy = "append", ): self.client = DuneClient(api_key, request_timeout=request_timeout) if "." not in table_name: raise ValueError("Table name must be in the format namespace.table_name") self.table_name: str = table_name - if if_exists not in {"append", "replace"}: - # TODO - Dune (support insert_ignore & upsert on table endpoints). - raise ValueError( - "Unsupported Table Existence Policy! " - "if_exists must be 'append' or 'replace'" - ) - self.if_exists: TableExistsPolicy = if_exists + self.insertion_type: InsertionPolicy = insertion_type super().__init__() def validate(self) -> bool: @@ -79,38 +78,57 @@ def save(self, data: TypedDataFrame) -> int: """ try: log.debug("Uploading DF to Dune...") - namespace, table_name = self._get_namespace_and_table_name() - if not self._skip_create(): - self.client.create_table( - namespace, - table_name, - schema=[ - {"name": name, "type": dtype} - for name, dtype in data.types.items() - ], - ) - result = self.client.insert_table( - namespace, - table_name, - data=io.BytesIO(data.dataframe.to_csv(index=False).encode()), - content_type="text/csv", - ) - if not result: - raise RuntimeError("Dune Upload Failed") - log.debug("Inserted DF to Dune, %s", result) + if self.insertion_type == "upload_csv": + self._upload_csv(data.dataframe) + else: + self._insert(data) + log.debug("Inserted DF to Dune, %s") except DuneError as dune_e: log.error("Dune did not accept our upload: %s", dune_e) except (ValueError, RuntimeError) as e: log.error("Data processing error: %s", e) return len(data) - def _skip_create(self) -> bool: - return self.if_exists == "append" and self._table_exists() + def _insert(self, data: TypedDataFrame) -> None: + namespace, table_name = self._get_namespace_and_table_name() + if self.insertion_type == "replace": + log.warning("Replacement feature is unstable!") + log.info("Deleting table: %s", table_name) + delete = self.client.delete_table(namespace, table_name) + log.info("Deleted: %s", delete) + + if not self._table_exists(): + log.info("Creating table: %s", self.table_name) + create = self.client.create_table( + namespace, + table_name, + schema=[ + {"name": name, "type": dtype} for name, dtype in data.types.items() + ], + ) + if not create: + raise RuntimeError("Dune Upload Failed") + log.info("Created: %s", create) + log.info("Inserting to: %s", self.table_name) + self.client.insert_table( + namespace, + table_name, + data=io.BytesIO(data.dataframe.to_csv(index=False).encode()), + content_type="text/csv", + ) + + def _upload_csv(self, data: DataFrame) -> None: + self.client.upload_csv(self.table_name, data.dataframe.to_csv(index=False)) def _table_exists(self) -> bool: try: - self.client.run_sql(f"SELECT count(*) FROM dune.{self.table_name}") - return True + results = self.client.run_query( + QueryBase( + 4554525, + params=[QueryParameter.text_type("table_name", self.table_name)], + ) + ) + return results.result is not None except QueryFailed: return False diff --git a/src/destinations/postgres.py b/src/destinations/postgres.py index 0256809..be1d2f1 100644 --- a/src/destinations/postgres.py +++ b/src/destinations/postgres.py @@ -11,10 +11,11 @@ ) from sqlalchemy.dialects.postgresql import insert -from src.destinations.common import TableExistsPolicy from src.interfaces import Destination, TypedDataFrame from src.logger import log +TableExistsPolicy = Literal["append", "replace", "upsert", "insert_ignore"] + class PostgresDestination(Destination[TypedDataFrame]): """A class representing PostgreSQL as a destination for data storage. diff --git a/src/sources/postgres.py b/src/sources/postgres.py index 05eee49..b4de256 100644 --- a/src/sources/postgres.py +++ b/src/sources/postgres.py @@ -142,7 +142,8 @@ async def fetch(self) -> TypedDataFrame: with self.engine.connect() as conn: result = conn.execute(text(self.query_string)) types = { - col.name: PG_TO_DUNE[col.type_code] for col in result.cursor.description + col.name: PG_TO_DUNE.get(col.type_code, "varchar") + for col in result.cursor.description } df = await loop.run_in_executor( None, lambda: pd.read_sql_query(self.query_string, con=self.engine) diff --git a/tests/unit/destinations_test.py b/tests/unit/destinations_test.py index bbe4191..5ff2767 100644 --- a/tests/unit/destinations_test.py +++ b/tests/unit/destinations_test.py @@ -32,41 +32,36 @@ def test_init_validation(self): api_key="anything", table_name="INVALID_TABLE_NAME", request_timeout=10, - if_exists="replace", + insertion_type="replace", ) self.assertIn( "Table name must be in the format namespace.table_name", ctx.exception.args[0], ) - with self.assertRaises(ValueError) as ctx: - DuneDestination( - api_key="anything", - table_name="table.name", - request_timeout=10, - if_exists="upsert", - ) - self.assertIn("Unsupported Table Existence Policy!", ctx.exception.args[0]) - def test_table_exists(self): mock_client = Mock() + dest = DuneDestination( api_key="anything", table_name="table.name", request_timeout=10, - if_exists="append", + insertion_type="append", ) dest.client = mock_client - mock_client.run_sql.return_value = None # Not Raise! + mock_result = Mock() + mock_result.result = "non-empty-result" + mock_client.run_query.return_value = mock_result self.assertEqual(True, dest._table_exists()) - mock_client.run_sql.side_effect = QueryFailed("Table not found") + mock_client.run_query.side_effect = QueryFailed("Table not found") self.assertEqual(False, dest._table_exists()) - @patch("dune_client.api.table.TableAPI.create_table", name="Fake Table Creator") + @patch("dune_client.api.table.TableAPI.delete_table", name="Fake Table Deleter") @patch("dune_client.api.table.TableAPI.insert_table", name="Fake Table Inserter") + @patch("dune_client.api.table.TableAPI.create_table", name="Fake Table Creator") def test_ensure_index_disabled_when_uploading( - self, mock_create_table, mock_insert_table, *_ + self, mock_create_table, mock_insert_table, mock_delete_table, *_ ): mock_create_table.return_value = { "namespace": "my_user", @@ -76,7 +71,7 @@ def test_ensure_index_disabled_when_uploading( "already_existed": False, "message": "Table created successfully", } - + mock_delete_table.return_value = {"message": "Table deleted successfully"} mock_insert_table.return_value = {"rows_written": 9000, "bytes_written": 90} dummy_df = TypedDataFrame( @@ -88,19 +83,19 @@ def test_ensure_index_disabled_when_uploading( ), types={"foo": "varchar", "baz": "varchar"}, ) - mock_client = Mock() destination = DuneDestination( api_key=os.getenv("DUNE_API_KEY"), table_name="foo.bar", request_timeout=10, - if_exists="replace", + insertion_type="replace", ) + destination._table_exists = Mock(return_value=True) with self.assertLogs(level=DEBUG) as logs: destination.save(dummy_df) self.assertIn("Uploading DF to Dune", logs.output[0]) - self.assertIn("Inserted DF to Dune,", logs.output[1]) + self.assertIn("Inserted DF to Dune,", logs.output[-1]) @patch("pandas.core.generic.NDFrame.to_csv", name="Fake csv writer") def test_duneclient_sets_timeout(self, mock_to_csv, *_): @@ -114,13 +109,18 @@ def test_duneclient_sets_timeout(self, mock_to_csv, *_): @patch("dune_client.api.table.TableAPI.create_table", name="Fake Table Creator") @patch("dune_client.api.table.TableAPI.insert_table", name="Fake Table Inserter") - def test_dune_error_handling(self, mock_create_table, mock_insert_table): + @patch("dune_client.api.table.TableAPI.delete_table", name="Fake Table Deleter") + def test_dune_error_handling( + self, mock_delete_table, mock_insert_table, mock_create_table, *_ + ): dest = DuneDestination( api_key="f00b4r", table_name="foo.bar", request_timeout=10, - if_exists="replace", + insertion_type="replace", ) + dest._table_exists = Mock(return_value=False) + df = TypedDataFrame(pd.DataFrame([{"foo": "bar"}]), {}) mock_create_table.return_value = { @@ -131,6 +131,9 @@ def test_dune_error_handling(self, mock_create_table, mock_insert_table): "already_existed": False, "message": "Table created successfully", } + mock_delete_table.return_value = { + "message": "Table bh2smith.dune_sync_test successfully deleted" + } mock_insert_table.return_value = {"rows_written": 9000, "bytes_written": 90} dune_err = DuneError( data={"error": "bad stuff"}, @@ -173,17 +176,24 @@ def test_dune_error_handling(self, mock_create_table, mock_insert_table): expected_message = "Data processing error: Big Oops" self.assertIn(expected_message, logs.output[0]) + # Reset all mocks to ensure clean state mock_create_table.reset_mock() + mock_insert_table.reset_mock() + mock_delete_table.reset_mock() # TIL: reset_mock() doesn't clear side effects.... mock_create_table.side_effect = None + mock_create_table.return_value = None + # Set return values explicitly mock_create_table.return_value = None with self.assertLogs(level=ERROR) as logs: dest.save(df) mock_create_table.assert_called_once() + mock_delete_table.assert_called_once() + self.assertIn("Dune Upload Failed", logs.output[0])