diff --git a/.changes/unreleased/Under the Hood-20240625-110833.yaml b/.changes/unreleased/Under the Hood-20240625-110833.yaml new file mode 100644 index 000000000..c4486a8f0 --- /dev/null +++ b/.changes/unreleased/Under the Hood-20240625-110833.yaml @@ -0,0 +1,6 @@ +kind: Under the Hood +body: Rework record/replay to record at the database connection level. +time: 2024-06-25T11:08:33.264457-04:00 +custom: + Author: peteralllenwebb + Issue: "244" diff --git a/dbt/adapters/record.py b/dbt/adapters/record.py deleted file mode 100644 index 5204f59ce..000000000 --- a/dbt/adapters/record.py +++ /dev/null @@ -1,67 +0,0 @@ -import dataclasses -from io import StringIO -import json -import re -from typing import Any, Optional, Mapping - -from agate import Table - -from dbt_common.events.contextvars import get_node_info -from dbt_common.record import Record, Recorder - -from dbt.adapters.contracts.connection import AdapterResponse - - -@dataclasses.dataclass -class QueryRecordParams: - sql: str - auto_begin: bool = False - fetch: bool = False - limit: Optional[int] = None - node_unique_id: Optional[str] = None - - def __post_init__(self) -> None: - if self.node_unique_id is None: - node_info = get_node_info() - self.node_unique_id = node_info["unique_id"] if node_info else "" - - @staticmethod - def _clean_up_sql(sql: str) -> str: - sql = re.sub(r"--.*?\n", "", sql) # Remove single-line comments (--) - sql = re.sub(r"/\*.*?\*/", "", sql, flags=re.DOTALL) # Remove multi-line comments (/* */) - return sql.replace(" ", "").replace("\n", "") - - def _matches(self, other: "QueryRecordParams") -> bool: - return self.node_unique_id == other.node_unique_id and self._clean_up_sql( - self.sql - ) == self._clean_up_sql(other.sql) - - -@dataclasses.dataclass -class QueryRecordResult: - adapter_response: Optional["AdapterResponse"] - table: Optional[Table] - - def _to_dict(self) -> Any: - buf = StringIO() - self.table.to_json(buf) # type: ignore - - return { - "adapter_response": self.adapter_response.to_dict(), # type: ignore - "table": buf.getvalue(), - } - - @classmethod - def _from_dict(cls, dct: Mapping) -> "QueryRecordResult": - return QueryRecordResult( - adapter_response=AdapterResponse.from_dict(dct["adapter_response"]), - table=Table.from_object(json.loads(dct["table"])), - ) - - -class QueryRecord(Record): - params_cls = QueryRecordParams - result_cls = QueryRecordResult - - -Recorder.register_record_type(QueryRecord) diff --git a/dbt/adapters/record/__init__.py b/dbt/adapters/record/__init__.py new file mode 100644 index 000000000..afde4a01c --- /dev/null +++ b/dbt/adapters/record/__init__.py @@ -0,0 +1,2 @@ +from dbt.adapters.record.handle import RecordReplayHandle +from dbt.adapters.record.cursor.cursor import RecordReplayCursor diff --git a/dbt/adapters/record/cursor/cursor.py b/dbt/adapters/record/cursor/cursor.py new file mode 100644 index 000000000..577178dbb --- /dev/null +++ b/dbt/adapters/record/cursor/cursor.py @@ -0,0 +1,54 @@ +from typing import Any, Optional + +from dbt_common.record import record_function + +from dbt.adapters.contracts.connection import Connection +from dbt.adapters.record.cursor.description import CursorGetDescriptionRecord +from dbt.adapters.record.cursor.execute import CursorExecuteRecord +from dbt.adapters.record.cursor.fetchone import CursorFetchOneRecord +from dbt.adapters.record.cursor.fetchmany import CursorFetchManyRecord +from dbt.adapters.record.cursor.fetchall import CursorFetchAllRecord +from dbt.adapters.record.cursor.rowcount import CursorGetRowCountRecord + + +class RecordReplayCursor: + """A proxy object used to wrap native database cursors under record/replay + modes. In record mode, this proxy notes the parameters and return values + of the methods and properties it implements, which closely match the Python + DB API 2.0 cursor methods used by many dbt adapters to interact with the + database or DWH. In replay mode, it mocks out those calls using previously + recorded calls, so that no interaction with a database actually occurs.""" + + def __init__(self, native_cursor: Any, connection: Connection) -> None: + self.native_cursor = native_cursor + self.connection = connection + + @record_function(CursorExecuteRecord, method=True, id_field_name="connection_name") + def execute(self, operation, parameters=None) -> None: + self.native_cursor.execute(operation, parameters) + + @record_function(CursorFetchOneRecord, method=True, id_field_name="connection_name") + def fetchone(self) -> Any: + return self.native_cursor.fetchone() + + @record_function(CursorFetchManyRecord, method=True, id_field_name="connection_name") + def fetchmany(self, size: int) -> Any: + return self.native_cursor.fetchmany(size) + + @record_function(CursorFetchAllRecord, method=True, id_field_name="connection_name") + def fetchall(self) -> Any: + return self.native_cursor.fetchall() + + @property + def connection_name(self) -> Optional[str]: + return self.connection.name + + @property + @record_function(CursorGetRowCountRecord, method=True, id_field_name="connection_name") + def rowcount(self) -> int: + return self.native_cursor.rowcount + + @property + @record_function(CursorGetDescriptionRecord, method=True, id_field_name="connection_name") + def description(self) -> str: + return self.native_cursor.description diff --git a/dbt/adapters/record/cursor/description.py b/dbt/adapters/record/cursor/description.py new file mode 100644 index 000000000..d6ba15d97 --- /dev/null +++ b/dbt/adapters/record/cursor/description.py @@ -0,0 +1,37 @@ +import dataclasses +from typing import Any, Iterable, Mapping + +from dbt_common.record import Record, Recorder + + +@dataclasses.dataclass +class CursorGetDescriptionParams: + connection_name: str + + +@dataclasses.dataclass +class CursorGetDescriptionResult: + columns: Iterable[Any] + + def _to_dict(self) -> Any: + column_dicts = [] + for c in self.columns: + # This captures the mandatory column information, but we might need + # more for some adapters. + # See https://peps.python.org/pep-0249/#description + column_dicts.append((c[0], c[1])) + + return {"columns": column_dicts} + + @classmethod + def _from_dict(cls, dct: Mapping) -> "CursorGetDescriptionResult": + return CursorGetDescriptionResult(columns=dct["columns"]) + + +@Recorder.register_record_type +class CursorGetDescriptionRecord(Record): + """Implements record/replay support for the cursor.description property.""" + + params_cls = CursorGetDescriptionParams + result_cls = CursorGetDescriptionResult + group = "Database" diff --git a/dbt/adapters/record/cursor/execute.py b/dbt/adapters/record/cursor/execute.py new file mode 100644 index 000000000..e7e698591 --- /dev/null +++ b/dbt/adapters/record/cursor/execute.py @@ -0,0 +1,20 @@ +import dataclasses +from typing import Any, Iterable, Union, Mapping + +from dbt_common.record import Record, Recorder + + +@dataclasses.dataclass +class CursorExecuteParams: + connection_name: str + operation: str + parameters: Union[Iterable[Any], Mapping[str, Any]] + + +@Recorder.register_record_type +class CursorExecuteRecord(Record): + """Implements record/replay support for the cursor.execute() method.""" + + params_cls = CursorExecuteParams + result_cls = None + group = "Database" diff --git a/dbt/adapters/record/cursor/fetchall.py b/dbt/adapters/record/cursor/fetchall.py new file mode 100644 index 000000000..090cc1603 --- /dev/null +++ b/dbt/adapters/record/cursor/fetchall.py @@ -0,0 +1,66 @@ +import dataclasses +import datetime +from typing import Any, Dict, List, Mapping + +from dbt_common.record import Record, Recorder + + +@dataclasses.dataclass +class CursorFetchAllParams: + connection_name: str + + +@dataclasses.dataclass +class CursorFetchAllResult: + results: List[Any] + + def _to_dict(self) -> Dict[str, Any]: + processed_results = [] + for result in self.results: + result = tuple(map(self._process_value, result)) + processed_results.append(result) + + return {"results": processed_results} + + @classmethod + def _from_dict(cls, dct: Mapping) -> "CursorFetchAllResult": + unprocessed_results = [] + for result in dct["results"]: + result = tuple(map(cls._unprocess_value, result)) + unprocessed_results.append(result) + + return CursorFetchAllResult(unprocessed_results) + + @classmethod + def _process_value(cls, value: Any) -> Any: + if type(value) is datetime.date: + return {"type": "date", "value": value.isoformat()} + elif type(value) is datetime.datetime: + return {"type": "datetime", "value": value.isoformat()} + else: + return value + + @classmethod + def _unprocess_value(cls, value: Any) -> Any: + if type(value) is dict: + value_type = value.get("type") + if value_type == "date": + date_string = value.get("value") + assert isinstance(date_string, str) + return datetime.date.fromisoformat(date_string) + elif value_type == "datetime": + date_string = value.get("value") + assert isinstance(date_string, str) + return datetime.datetime.fromisoformat(date_string) + return value + else: + return value + + +@Recorder.register_record_type +class CursorFetchAllRecord(Record): + """Implements record/replay support for the cursor.fetchall() method.""" + + params_cls = CursorFetchAllParams + result_cls = CursorFetchAllResult + group = "Database" diff --git a/dbt/adapters/record/cursor/fetchmany.py b/dbt/adapters/record/cursor/fetchmany.py new file mode 100644 index 000000000..86f154408 --- /dev/null +++ b/dbt/adapters/record/cursor/fetchmany.py @@ -0,0 +1,23 @@ +import dataclasses +from typing import Any, List + +from dbt_common.record import Record, Recorder + + +@dataclasses.dataclass +class CursorFetchManyParams: + connection_name: str + + +@dataclasses.dataclass +class CursorFetchManyResult: + results: List[Any] + + +@Recorder.register_record_type +class CursorFetchManyRecord(Record): + """Implements record/replay support for the cursor.fetchmany() method.""" + + params_cls = CursorFetchManyParams + result_cls = CursorFetchManyResult + group = "Database" diff --git a/dbt/adapters/record/cursor/fetchone.py b/dbt/adapters/record/cursor/fetchone.py new file mode 100644 index 000000000..42ffe210c --- /dev/null +++ b/dbt/adapters/record/cursor/fetchone.py @@ -0,0 +1,23 @@ +import dataclasses +from typing import Any + +from dbt_common.record import Record, Recorder + + +@dataclasses.dataclass +class CursorFetchOneParams: + connection_name: str + + +@dataclasses.dataclass +class CursorFetchOneResult: + result: Any + + +@Recorder.register_record_type +class CursorFetchOneRecord(Record): + """Implements record/replay support for the cursor.fetchone() method.""" + + params_cls = CursorFetchOneParams + result_cls = CursorFetchOneResult + group = "Database" diff --git a/dbt/adapters/record/cursor/rowcount.py b/dbt/adapters/record/cursor/rowcount.py new file mode 100644 index 000000000..c024817ef --- /dev/null +++ b/dbt/adapters/record/cursor/rowcount.py @@ -0,0 +1,23 @@ +import dataclasses +from typing import Optional + +from dbt_common.record import Record, Recorder + + +@dataclasses.dataclass +class CursorGetRowCountParams: + connection_name: str + + +@dataclasses.dataclass +class CursorGetRowCountResult: + rowcount: Optional[int] + + +@Recorder.register_record_type +class CursorGetRowCountRecord(Record): + """Implements record/replay support for the cursor.rowcount property.""" + + params_cls = CursorGetRowCountParams + result_cls = CursorGetRowCountResult + group = "Database" diff --git a/dbt/adapters/record/handle.py b/dbt/adapters/record/handle.py new file mode 100644 index 000000000..31817c374 --- /dev/null +++ b/dbt/adapters/record/handle.py @@ -0,0 +1,24 @@ +from typing import Any + +from dbt.adapters.contracts.connection import Connection + +from dbt.adapters.record.cursor.cursor import RecordReplayCursor + + +class RecordReplayHandle: + """A proxy object used for record/replay modes. What adapters call a + 'handle' is typically a native database connection, but should not be + confused with the Connection protocol, which is a dbt-adapters concept. + + Currently, the only function of the handle proxy is to provide a record/replay + aware cursor object when cursor() is called.""" + + def __init__(self, native_handle: Any, connection: Connection) -> None: + self.native_handle = native_handle + self.connection = connection + + def cursor(self) -> Any: + # The native handle could be None if we are in replay mode, because no + # actual database access should be performed in that mode. + cursor = None if self.native_handle is None else self.native_handle.cursor() + return RecordReplayCursor(cursor, self.connection) diff --git a/dbt/adapters/sql/connections.py b/dbt/adapters/sql/connections.py index 13111a6e8..4d450c88c 100644 --- a/dbt/adapters/sql/connections.py +++ b/dbt/adapters/sql/connections.py @@ -5,7 +5,6 @@ from dbt_common.events.contextvars import get_node_info from dbt_common.events.functions import fire_event from dbt_common.exceptions import DbtInternalError, NotImplementedError -from dbt_common.record import record_function from dbt_common.utils import cast_to_str from dbt.adapters.base import BaseConnectionManager @@ -20,7 +19,6 @@ SQLQuery, SQLQueryStatus, ) -from dbt.adapters.record import QueryRecord if TYPE_CHECKING: import agate @@ -143,7 +141,6 @@ def get_result_from_cursor(cls, cursor: Any, limit: Optional[int]) -> "agate.Tab return table_from_data_flat(data, column_names) - @record_function(QueryRecord, method=True, tuple_result=True) def execute( self, sql: str, diff --git a/docs/guides/record_replay.md b/docs/guides/record_replay.md index 670bb8431..5bcbec06c 100644 --- a/docs/guides/record_replay.md +++ b/docs/guides/record_replay.md @@ -4,24 +4,12 @@ This document describes how to implement support for dbt's Record/Replay Subsyst ## Recording and Replaying Warehouse Interaction -The goal of the Record/Replay Subsystem is to record all interactions between dbt and external systems, of which the data warehouse is the most obvious. Since, warehouse interaction is mediated by adapters, full Record/Replay support requires that adapters record all interactions they have with the warehouse. (It also requires that they record access to the local filesystem or external service, if that is access is not mediated by dbt itself. This includes authentication steps, opening and closing connections, beginning and ending transactions, and so forth.) +The goal of the Record/Replay Subsystem is to record all interactions between dbt and external systems, of which the data warehouse is the most important. Since, warehouse interaction is mediated by adapters, full Record/Replay support requires that adapters record all interactions they have with the warehouse. It also requires that they record access to the local filesystem or external service, if that is access is not mediated by dbt itself. This includes authentication steps, opening and closing connections, beginning and ending transactions, etc. -In practice, this means that any request sent to the warehouse must be recorded, along with the corresponding response. If this is done correctly, as described in the document linked in the intro, the Record portion of the Record/Replay subsystem should work as expected. - -At the time of this writing, there is only an incomplete implementation of this goal, which can be found in `dbt-adapters/dbt/adapters/record.py`. - -There are some important things to notice about this implementation. First, the QueryRecordResult class provides custom serialization methods `to_dict()` and `from_dict()`. This is necessary because the `AdapterResponse` and `Agate.Table` types cannot be automatically converted to and from JSON by the dataclass library, and JSON is the format used to persist recordings to disk and reload them for replay. - -Another important feature is that `QueryRecordParams` implements the `_matches()` method. This method allows `dbt-adapters` to customize the way that the Record/Replay determines whether a query issued by dbt matches a previously recorded query. In this case, the method performs a comparison which attempts to ignore comments and whitespace which would not affect query behavior. +A basic implementation of Record/Replay functionality, suitable for most adapters which extend the `SQLAdapter` class, can be found in `dbt-adapters/dbt/adapters/record`. The `RecordReplayHandle` and `RecordReplayCursor` classes defined there are used to intercept and record or replay all DWH interactions. They are an excellent starting point for adapters which extend `SQLAdapter` and use a database library which substantially conforms to Python's DB API v2.0 (PEP 249). Examples of how library-specific deviations from that API can be found in the dbt-postgress and dbt-snowflake repositories. ## Misc. Notes and Suggestions -Currently, support for recording data warehouse interaction is very rudimentary, however, even rudimentary support is valuable and we should be concentrating on extending it in a way that adds the most value with the least work. Usefulness, rather than perfection, is the initial goal. - -Picking the right functions to record, at the right level of abstraction, will probably be the most important part of carrying this work forward. - Not every interaction with an external system has to be recorded in full detail, and authentication might prove to be a place where we exclude sensitive secrets from the recording. For example, since replay will not actually be communicating with the warehouse, it may be possible to exclude passwords and auth keys from the parameters recorded, and to exclude auth tokens from the results. In addition to adding an appropriate decorator to functions which communicate with external systems, you should check those functions for side-effects. Since the function's calls will be mocked out in replay mode, those side-effects will not be carried out during replay. At present, we are focusing on support for recording and comparing recordings, but this is worth keeping in mind. - -The current implementation records which dbt node issues a query, and uses that information to ensure a match during replay. The same node should issue the same query. A better model might be to monitor which connection issued which query, and associate the same connection with open/close operations, transaction starts/stops and so forth.