Skip to content

Commit

Permalink
reintroduce upload_csv and unstable replace
Browse files Browse the repository at this point in the history
  • Loading branch information
bh2smith committed Jan 11, 2025
1 parent 40d827e commit 73f4914
Show file tree
Hide file tree
Showing 7 changed files with 94 additions and 65 deletions.
9 changes: 7 additions & 2 deletions config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,16 @@ jobs:
- name: p2d-test
source:
ref: PG
query_string: "SELECT 1 as number, '\\x1234'::bytea as my_bytes;"
query_string: |
SELECT
1 AS number,
'\\x1234'::bytea AS my_bytes,
'{"key": "value", "array": [1, 2, 3], "dict": {}}'::json AS my_json,
'[ [{"x": 1}, {"y": 2}], null, [{"z": 3}] ]'::json AS list_dict
destination:
ref: Dune
table_name: bh2smith.dune_sync_test
if_exists: append
insertion_type: replace

- name: cow-solvers
source:
Expand Down
6 changes: 2 additions & 4 deletions src/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,8 +305,6 @@ def _build_destination(
f' "{dest_config["ref"]}" defined in config'
) from e

if_exists = dest_config.get("if_exists", "append")

match dest.type:
case Database.DUNE:
try:
Expand All @@ -325,14 +323,14 @@ def _build_destination(
api_key=dest.key,
table_name=dest_config["table_name"],
request_timeout=request_timeout,
if_exists=if_exists,
insertion_type=dest_config.get("insertion_type", "append"),
)

case Database.POSTGRES:
return PostgresDestination(
db_url=dest.key,
table_name=dest_config["table_name"],
if_exists=if_exists,
if_exists=dest_config.get("if_exists", "append"),
index_columns=dest_config.get("index_columns", []),
)
raise ValueError(f"Unsupported destination_db type: {dest}")
4 changes: 0 additions & 4 deletions src/destinations/common.py
Original file line number Diff line number Diff line change
@@ -1,5 +1 @@
"""Common structures used in multiple destination implementations."""

from typing import Literal

TableExistsPolicy = Literal["append", "replace", "upsert", "insert_ignore"]
82 changes: 50 additions & 32 deletions src/destinations/dune.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
"""Destination logic for Dune Analytics."""

import io
from typing import Literal

from dune_client.client import DuneClient
from dune_client.models import DuneError, QueryFailed
from dune_client.query import QueryBase
from dune_client.types import QueryParameter
from pandas import DataFrame

from src.destinations.common import TableExistsPolicy
from src.interfaces import Destination, TypedDataFrame
from src.logger import log

InsertionPolicy = Literal["append", "replace", "upload_csv"]


class DuneDestination(Destination[TypedDataFrame]):
"""A class representing as Dune as a destination.
Expand All @@ -34,20 +39,14 @@ def __init__(
api_key: str,
table_name: str,
request_timeout: int,
if_exists: TableExistsPolicy = "append",
insertion_type: InsertionPolicy = "append",
):
self.client = DuneClient(api_key, request_timeout=request_timeout)
if "." not in table_name:
raise ValueError("Table name must be in the format namespace.table_name")

self.table_name: str = table_name
if if_exists not in {"append", "replace"}:
# TODO - Dune (support insert_ignore & upsert on table endpoints).
raise ValueError(
"Unsupported Table Existence Policy! "
"if_exists must be 'append' or 'replace'"
)
self.if_exists: TableExistsPolicy = if_exists
self.insertion_type: InsertionPolicy = insertion_type
super().__init__()

def validate(self) -> bool:
Expand Down Expand Up @@ -79,38 +78,57 @@ def save(self, data: TypedDataFrame) -> int:
"""
try:
log.debug("Uploading DF to Dune...")
namespace, table_name = self._get_namespace_and_table_name()
if not self._skip_create():
self.client.create_table(
namespace,
table_name,
schema=[
{"name": name, "type": dtype}
for name, dtype in data.types.items()
],
)
result = self.client.insert_table(
namespace,
table_name,
data=io.BytesIO(data.dataframe.to_csv(index=False).encode()),
content_type="text/csv",
)
if not result:
raise RuntimeError("Dune Upload Failed")
log.debug("Inserted DF to Dune, %s", result)
if self.insertion_type == "upload_csv":
self._upload_csv(data.dataframe)
else:
self._insert(data)
log.debug("Inserted DF to Dune, %s")
except DuneError as dune_e:
log.error("Dune did not accept our upload: %s", dune_e)
except (ValueError, RuntimeError) as e:
log.error("Data processing error: %s", e)
return len(data)

def _skip_create(self) -> bool:
return self.if_exists == "append" and self._table_exists()
def _insert(self, data: TypedDataFrame) -> None:
namespace, table_name = self._get_namespace_and_table_name()
if self.insertion_type == "replace":
log.warning("Replacement feature is unstable!")
log.info("Deleting table: %s", table_name)
delete = self.client.delete_table(namespace, table_name)
log.info("Deleted: %s", delete)

if not self._table_exists():
log.info("Creating table: %s", self.table_name)
create = self.client.create_table(
namespace,
table_name,
schema=[
{"name": name, "type": dtype} for name, dtype in data.types.items()
],
)
if not create:
raise RuntimeError("Dune Upload Failed")
log.info("Created: %s", create)
log.info("Inserting to: %s", self.table_name)
self.client.insert_table(
namespace,
table_name,
data=io.BytesIO(data.dataframe.to_csv(index=False).encode()),
content_type="text/csv",
)

def _upload_csv(self, data: DataFrame) -> None:
self.client.upload_csv(self.table_name, data.dataframe.to_csv(index=False))

def _table_exists(self) -> bool:
try:
self.client.run_sql(f"SELECT count(*) FROM dune.{self.table_name}")
return True
results = self.client.run_query(
QueryBase(
4554525,
params=[QueryParameter.text_type("table_name", self.table_name)],
)
)
return results.result is not None
except QueryFailed:
return False

Expand Down
3 changes: 2 additions & 1 deletion src/destinations/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,11 @@
)
from sqlalchemy.dialects.postgresql import insert

from src.destinations.common import TableExistsPolicy
from src.interfaces import Destination, TypedDataFrame
from src.logger import log

TableExistsPolicy = Literal["append", "replace", "upsert", "insert_ignore"]


class PostgresDestination(Destination[TypedDataFrame]):
"""A class representing PostgreSQL as a destination for data storage.
Expand Down
3 changes: 2 additions & 1 deletion src/sources/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,8 @@ async def fetch(self) -> TypedDataFrame:
with self.engine.connect() as conn:
result = conn.execute(text(self.query_string))
types = {
col.name: PG_TO_DUNE[col.type_code] for col in result.cursor.description
col.name: PG_TO_DUNE.get(col.type_code, "varchar")
for col in result.cursor.description
}
df = await loop.run_in_executor(
None, lambda: pd.read_sql_query(self.query_string, con=self.engine)
Expand Down
52 changes: 31 additions & 21 deletions tests/unit/destinations_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,41 +32,36 @@ def test_init_validation(self):
api_key="anything",
table_name="INVALID_TABLE_NAME",
request_timeout=10,
if_exists="replace",
insertion_type="replace",
)
self.assertIn(
"Table name must be in the format namespace.table_name",
ctx.exception.args[0],
)

with self.assertRaises(ValueError) as ctx:
DuneDestination(
api_key="anything",
table_name="table.name",
request_timeout=10,
if_exists="upsert",
)
self.assertIn("Unsupported Table Existence Policy!", ctx.exception.args[0])

def test_table_exists(self):
mock_client = Mock()

dest = DuneDestination(
api_key="anything",
table_name="table.name",
request_timeout=10,
if_exists="append",
insertion_type="append",
)
dest.client = mock_client
mock_client.run_sql.return_value = None # Not Raise!
mock_result = Mock()
mock_result.result = "non-empty-result"
mock_client.run_query.return_value = mock_result
self.assertEqual(True, dest._table_exists())

mock_client.run_sql.side_effect = QueryFailed("Table not found")
mock_client.run_query.side_effect = QueryFailed("Table not found")
self.assertEqual(False, dest._table_exists())

@patch("dune_client.api.table.TableAPI.create_table", name="Fake Table Creator")
@patch("dune_client.api.table.TableAPI.delete_table", name="Fake Table Deleter")
@patch("dune_client.api.table.TableAPI.insert_table", name="Fake Table Inserter")
@patch("dune_client.api.table.TableAPI.create_table", name="Fake Table Creator")
def test_ensure_index_disabled_when_uploading(
self, mock_create_table, mock_insert_table, *_
self, mock_create_table, mock_insert_table, mock_delete_table, *_
):
mock_create_table.return_value = {
"namespace": "my_user",
Expand All @@ -76,7 +71,7 @@ def test_ensure_index_disabled_when_uploading(
"already_existed": False,
"message": "Table created successfully",
}

mock_delete_table.return_value = {"message": "Table deleted successfully"}
mock_insert_table.return_value = {"rows_written": 9000, "bytes_written": 90}

dummy_df = TypedDataFrame(
Expand All @@ -88,19 +83,19 @@ def test_ensure_index_disabled_when_uploading(
),
types={"foo": "varchar", "baz": "varchar"},
)
mock_client = Mock()
destination = DuneDestination(
api_key=os.getenv("DUNE_API_KEY"),
table_name="foo.bar",
request_timeout=10,
if_exists="replace",
insertion_type="replace",
)
destination._table_exists = Mock(return_value=True)

with self.assertLogs(level=DEBUG) as logs:
destination.save(dummy_df)

self.assertIn("Uploading DF to Dune", logs.output[0])
self.assertIn("Inserted DF to Dune,", logs.output[1])
self.assertIn("Inserted DF to Dune,", logs.output[-1])

@patch("pandas.core.generic.NDFrame.to_csv", name="Fake csv writer")
def test_duneclient_sets_timeout(self, mock_to_csv, *_):
Expand All @@ -114,13 +109,18 @@ def test_duneclient_sets_timeout(self, mock_to_csv, *_):

@patch("dune_client.api.table.TableAPI.create_table", name="Fake Table Creator")
@patch("dune_client.api.table.TableAPI.insert_table", name="Fake Table Inserter")
def test_dune_error_handling(self, mock_create_table, mock_insert_table):
@patch("dune_client.api.table.TableAPI.delete_table", name="Fake Table Deleter")
def test_dune_error_handling(
self, mock_delete_table, mock_insert_table, mock_create_table, *_
):
dest = DuneDestination(
api_key="f00b4r",
table_name="foo.bar",
request_timeout=10,
if_exists="replace",
insertion_type="replace",
)
dest._table_exists = Mock(return_value=False)

df = TypedDataFrame(pd.DataFrame([{"foo": "bar"}]), {})

mock_create_table.return_value = {
Expand All @@ -131,6 +131,9 @@ def test_dune_error_handling(self, mock_create_table, mock_insert_table):
"already_existed": False,
"message": "Table created successfully",
}
mock_delete_table.return_value = {
"message": "Table bh2smith.dune_sync_test successfully deleted"
}
mock_insert_table.return_value = {"rows_written": 9000, "bytes_written": 90}
dune_err = DuneError(
data={"error": "bad stuff"},
Expand Down Expand Up @@ -173,17 +176,24 @@ def test_dune_error_handling(self, mock_create_table, mock_insert_table):
expected_message = "Data processing error: Big Oops"
self.assertIn(expected_message, logs.output[0])

# Reset all mocks to ensure clean state
mock_create_table.reset_mock()
mock_insert_table.reset_mock()
mock_delete_table.reset_mock()

# TIL: reset_mock() doesn't clear side effects....
mock_create_table.side_effect = None
mock_create_table.return_value = None

# Set return values explicitly
mock_create_table.return_value = None

with self.assertLogs(level=ERROR) as logs:
dest.save(df)

mock_create_table.assert_called_once()
mock_delete_table.assert_called_once()

self.assertIn("Dune Upload Failed", logs.output[0])


Expand Down

0 comments on commit 73f4914

Please sign in to comment.