From df6282bfafa351bd77be4a8a436860de4ee3215b Mon Sep 17 00:00:00 2001 From: Jorrit Sandbrink <47451109+jorritsandbrink@users.noreply.github.com> Date: Thu, 25 Apr 2024 15:58:58 +0400 Subject: [PATCH] `scd2` extension: active record literal changes (#1275) * change default active record literal type to null and make configurable * replace active_record_literal_type with active_record_timestamp setting * add test for active record timestamp varieties * add docs for scd2 active_record_timestamp setting * fix docs --------- Co-authored-by: Jorrit Sandbrink Co-authored-by: Dave --- dlt/common/schema/typing.py | 9 +- dlt/common/schema/utils.py | 12 +- dlt/destinations/sql_jobs.py | 19 +- dlt/extract/hints.py | 6 +- .../docs/general-usage/incremental-loading.md | 35 ++-- tests/load/pipeline/test_scd2.py | 169 +++++++++++++----- 6 files changed, 176 insertions(+), 74 deletions(-) diff --git a/dlt/common/schema/typing.py b/dlt/common/schema/typing.py index e1022cfa84..a196e07621 100644 --- a/dlt/common/schema/typing.py +++ b/dlt/common/schema/typing.py @@ -17,7 +17,8 @@ from dlt.common.data_types import TDataType from dlt.common.normalizers.typing import TNormalizersConfig -from dlt.common.typing import TSortOrder +from dlt.common.typing import TSortOrder, TAnyDateTime +from dlt.common.pendulum import pendulum try: from pydantic import BaseModel as _PydanticBaseModel @@ -33,8 +34,6 @@ LOADS_TABLE_NAME = "_dlt_loads" STATE_TABLE_NAME = "_dlt_pipeline_state" DLT_NAME_PREFIX = "_dlt" -DEFAULT_VALIDITY_COLUMN_NAMES = ["_dlt_valid_from", "_dlt_valid_to"] -"""Default values for validity column names used in `scd2` merge strategy.""" TColumnProp = Literal[ "name", @@ -161,6 +160,9 @@ class NormalizerInfo(TypedDict, total=True): WRITE_DISPOSITIONS: Set[TWriteDisposition] = set(get_args(TWriteDisposition)) MERGE_STRATEGIES: Set[TLoaderMergeStrategy] = set(get_args(TLoaderMergeStrategy)) +DEFAULT_VALIDITY_COLUMN_NAMES = ["_dlt_valid_from", "_dlt_valid_to"] +"""Default values for validity column names used in `scd2` merge strategy.""" + class TWriteDispositionDict(TypedDict): disposition: TWriteDisposition @@ -169,6 +171,7 @@ class TWriteDispositionDict(TypedDict): class TMergeDispositionDict(TWriteDispositionDict, total=False): strategy: Optional[TLoaderMergeStrategy] validity_column_names: Optional[List[str]] + active_record_timestamp: Optional[TAnyDateTime] row_version_column_name: Optional[str] diff --git a/dlt/common/schema/utils.py b/dlt/common/schema/utils.py index 8da9029124..6be124cf2a 100644 --- a/dlt/common/schema/utils.py +++ b/dlt/common/schema/utils.py @@ -5,6 +5,8 @@ from copy import deepcopy, copy from typing import Dict, List, Sequence, Tuple, Type, Any, cast, Iterable, Optional, Union +from dlt.common.pendulum import pendulum +from dlt.common.time import ensure_pendulum_datetime from dlt.common.json import json from dlt.common.data_types import TDataType from dlt.common.exceptions import DictValidationException @@ -481,7 +483,8 @@ def get_columns_names_with_prop( return [ c["name"] for c in table["columns"].values() - if bool(c.get(column_prop, False)) and (include_incomplete or is_complete_column(c)) + if (bool(c.get(column_prop, False)) or c.get(column_prop, False) is None) + and (include_incomplete or is_complete_column(c)) ] @@ -525,6 +528,13 @@ def get_validity_column_names(table: TTableSchema) -> List[Optional[str]]: ] +def get_active_record_timestamp(table: TTableSchema) -> Optional[pendulum.DateTime]: + # method assumes a column with "x-active-record-timestamp" property exists + cname = get_first_column_name_with_prop(table, "x-active-record-timestamp") + hint_val = table["columns"][cname]["x-active-record-timestamp"] # type: ignore[typeddict-item] + return None if hint_val is None else ensure_pendulum_datetime(hint_val) + + def merge_schema_updates(schema_updates: Sequence[TSchemaUpdate]) -> TSchemaTables: aggregated_update: TSchemaTables = {} for schema_update in schema_updates: diff --git a/dlt/destinations/sql_jobs.py b/dlt/destinations/sql_jobs.py index e7993106e1..ad91a76890 100644 --- a/dlt/destinations/sql_jobs.py +++ b/dlt/destinations/sql_jobs.py @@ -13,6 +13,7 @@ get_first_column_name_with_prop, get_dedup_sort_tuple, get_validity_column_names, + get_active_record_timestamp, DEFAULT_MERGE_STRATEGY, ) from dlt.common.storages.load_storage import ParsedLoadJobFileName @@ -24,10 +25,6 @@ from dlt.pipeline.current import load_package as current_load_package -HIGH_TS = pendulum.datetime(9999, 12, 31) -"""High timestamp used to indicate active records in `scd2` merge strategy.""" - - class SqlJobParams(TypedDict, total=False): replace: Optional[bool] table_chain_create_table_statements: Dict[str, Sequence[str]] @@ -546,12 +543,20 @@ def gen_scd2_sql( current_load_package()["state"]["created_at"], caps.timestamp_precision, ) - active_record_ts = format_datetime_literal(HIGH_TS, caps.timestamp_precision) + active_record_timestamp = get_active_record_timestamp(root_table) + if active_record_timestamp is None: + active_record_literal = "NULL" + is_active_clause = f"{to} IS NULL" + else: # it's a datetime + active_record_literal = format_datetime_literal( + active_record_timestamp, caps.timestamp_precision + ) + is_active_clause = f"{to} = {active_record_literal}" # retire updated and deleted records sql.append(f""" UPDATE {root_table_name} SET {to} = {boundary_ts} - WHERE {to} = {active_record_ts} + WHERE {is_active_clause} AND {hash_} NOT IN (SELECT {hash_} FROM {staging_root_table_name}); """) @@ -560,7 +565,7 @@ def gen_scd2_sql( col_str = ", ".join([c for c in columns if c not in (from_, to)]) sql.append(f""" INSERT INTO {root_table_name} ({col_str}, {from_}, {to}) - SELECT {col_str}, {boundary_ts} AS {from_}, {active_record_ts} AS {to} + SELECT {col_str}, {boundary_ts} AS {from_}, {active_record_literal} AS {to} FROM {staging_root_table_name} AS s WHERE {hash_} NOT IN (SELECT {hash_} FROM {root_table_name}); """) diff --git a/dlt/extract/hints.py b/dlt/extract/hints.py index 97da7dab9c..75ad02e3fe 100644 --- a/dlt/extract/hints.py +++ b/dlt/extract/hints.py @@ -443,11 +443,9 @@ def _merge_merge_disposition_dict(dict_: Dict[str, Any]) -> None: "data_type": "timestamp", "nullable": True, "x-valid-to": True, + "x-active-record-timestamp": mddict.get("active_record_timestamp"), } - if mddict.get("row_version_column_name") is None: - hash_ = "_dlt_id" - else: - hash_ = mddict["row_version_column_name"] + hash_ = mddict.get("row_version_column_name", "_dlt_id") dict_["columns"][hash_] = { "name": hash_, "nullable": False, diff --git a/docs/website/docs/general-usage/incremental-loading.md b/docs/website/docs/general-usage/incremental-loading.md index 38d3215e68..bf0a54c9d8 100644 --- a/docs/website/docs/general-usage/incremental-loading.md +++ b/docs/website/docs/general-usage/incremental-loading.md @@ -242,8 +242,8 @@ In example above we enforce the root key propagation with `fb_ads.root_key = Tru that correct data is propagated on initial `replace` load so the future `merge` load can be executed. You can achieve the same in the decorator `@dlt.source(root_key=True)`. -### ๐Ÿงช `scd2` strategy -`dlt` can create [Slowly Changing Dimension Type 2](https://en.wikipedia.org/wiki/Slowly_changing_dimension#Type_2:_add_new_row) (SCD2) destination tables for dimension tables that change in the source. The resource is expected to provide a full extract of the source table each run. A row hash is stored in `_dlt_id` and used as surrogate key to identify source records that have been inserted, updated, or deleted. A high timestamp (9999-12-31 00:00:00.000000) is used to indicate an active record. +### `scd2` strategy +`dlt` can create [Slowly Changing Dimension Type 2](https://en.wikipedia.org/wiki/Slowly_changing_dimension#Type_2:_add_new_row) (SCD2) destination tables for dimension tables that change in the source. The resource is expected to provide a full extract of the source table each run. A row hash is stored in `_dlt_id` and used as surrogate key to identify source records that have been inserted, updated, or deleted. A `NULL` value is used by default to indicate an active record, but it's possible to use a configurable high timestamp (e.g. 9999-12-31 00:00:00.000000) instead. #### Example: `scd2` merge strategy ```py @@ -265,8 +265,8 @@ pipeline.run(dim_customer()) # first run โ€” 2024-04-09 18:27:53.734235 | `_dlt_valid_from` | `_dlt_valid_to` | `customer_key` | `c1` | `c2` | | -- | -- | -- | -- | -- | -| 2024-04-09 18:27:53.734235 | 9999-12-31 00:00:00.000000 | 1 | foo | 1 | -| 2024-04-09 18:27:53.734235 | 9999-12-31 00:00:00.000000 | 2 | bar | 2 | +| 2024-04-09 18:27:53.734235 | NULL | 1 | foo | 1 | +| 2024-04-09 18:27:53.734235 | NULL | 2 | bar | 2 | ```py ... @@ -285,8 +285,8 @@ pipeline.run(dim_customer()) # second run โ€” 2024-04-09 22:13:07.943703 | `_dlt_valid_from` | `_dlt_valid_to` | `customer_key` | `c1` | `c2` | | -- | -- | -- | -- | -- | | 2024-04-09 18:27:53.734235 | **2024-04-09 22:13:07.943703** | 1 | foo | 1 | -| 2024-04-09 18:27:53.734235 | 9999-12-31 00:00:00.000000 | 2 | bar | 2 | -| **2024-04-09 22:13:07.943703** | **9999-12-31 00:00:00.000000** | **1** | **foo_updated** | **1** | +| 2024-04-09 18:27:53.734235 | NULL | 2 | bar | 2 | +| **2024-04-09 22:13:07.943703** | **NULL** | **1** | **foo_updated** | **1** | ```py ... @@ -305,14 +305,9 @@ pipeline.run(dim_customer()) # third run โ€” 2024-04-10 06:45:22.847403 | -- | -- | -- | -- | -- | | 2024-04-09 18:27:53.734235 | 2024-04-09 22:13:07.943703 | 1 | foo | 1 | | 2024-04-09 18:27:53.734235 | **2024-04-10 06:45:22.847403** | 2 | bar | 2 | -| 2024-04-09 22:13:07.943703 | 9999-12-31 00:00:00.000000 | 1 | foo_updated | 1 | +| 2024-04-09 22:13:07.943703 | NULL | 1 | foo_updated | 1 | -:::caution -SCD2 is still work in progress. We plan to change the default **high timestamp** from `9999-12-31 00:00:00.000000` to `NULL` -and make it configurable. This feature will be released with `dlt` 0.4.10 -::: - -#### Example: customize validity column names +#### Example: configure validity column names `_dlt_valid_from` and `_dlt_valid_to` are used by default as validity column names. Other names can be configured as follows: ```py @dlt.resource( @@ -327,6 +322,20 @@ def dim_customer(): ... ``` +#### Example: configure active record timestamp +You can configure the literal used to indicate an active record with `active_record_timestamp`. The default literal `NULL` is used if `active_record_timestamp` is omitted or set to `None`. Provide a date value if you prefer to use a high timestamp instead. +```py +@dlt.resource( + write_disposition={ + "disposition": "merge", + "strategy": "scd2", + "active_record_timestamp": "9999-12-31", # e.g. datetime.datetime(9999, 12, 31) is also accepted + } +) +def dim_customer(): + ... +``` + #### Example: use your own row hash By default, `dlt` generates a row hash based on all columns provided by the resource and stores it in `_dlt_id`. You can use your own hash instead by specifying `row_version_column_name` in the `write_disposition` dictionary. You might already have a column present in your resource that can naturally serve as row hash, in which case it's more efficient to use those pre-existing hash values than to generate new artificial ones. This option also allows you to use hashes based on a subset of columns, in case you want to ignore changes in some of the columns. When using your own hash, values for `_dlt_id` are randomly generated. ```py diff --git a/tests/load/pipeline/test_scd2.py b/tests/load/pipeline/test_scd2.py index 8ea39d31e5..d5d76580d7 100644 --- a/tests/load/pipeline/test_scd2.py +++ b/tests/load/pipeline/test_scd2.py @@ -1,18 +1,18 @@ # timezone is removed from all datetime objects in these tests to simplify comparison import pytest -from typing import List, Dict, Any -from datetime import datetime, timezone # noqa: I251 +from typing import List, Dict, Any, Optional +from datetime import date, datetime, timezone # noqa: I251 import dlt +from dlt.common.typing import TAnyDateTime +from dlt.common.pendulum import pendulum from dlt.common.pipeline import LoadInfo from dlt.common.schema.exceptions import ColumnNameConflictException from dlt.common.schema.typing import DEFAULT_VALIDITY_COLUMN_NAMES from dlt.common.normalizers.json.relational import DataItemNormalizer from dlt.common.normalizers.naming.snake_case import NamingConvention as SnakeCaseNamingConvention from dlt.common.time import ensure_pendulum_datetime, reduce_pendulum_datetime_precision -from dlt.common.typing import TDataItem -from dlt.destinations.sql_jobs import HIGH_TS from dlt.extract.resource import DltResource from dlt.pipeline.exceptions import PipelineStepFailed @@ -29,12 +29,6 @@ get_row_hash = DataItemNormalizer.get_row_hash -def get_active_ts(pipeline: dlt.Pipeline) -> datetime: - caps = pipeline._get_destination_capabilities() - active_ts = HIGH_TS.in_timezone(tz="UTC").replace(tzinfo=None) - return reduce_pendulum_datetime_precision(active_ts, caps.timestamp_precision) - - def get_load_package_created_at(pipeline: dlt.Pipeline, load_info: LoadInfo) -> datetime: """Returns `created_at` property of load package state.""" load_id = load_info.asdict()["loads_ids"][0] @@ -47,19 +41,15 @@ def get_load_package_created_at(pipeline: dlt.Pipeline, load_info: LoadInfo) -> return reduce_pendulum_datetime_precision(created_at, caps.timestamp_precision) -def strip_timezone(ts: datetime) -> datetime: - """Converts timezone of datetime object to UTC and removes timezone awareness.""" - ts = ensure_pendulum_datetime(ts) - if ts.replace(tzinfo=None) == HIGH_TS: - return ts.replace(tzinfo=None) - else: - return ts.astimezone(tz=timezone.utc).replace(tzinfo=None) - - def get_table( pipeline: dlt.Pipeline, table_name: str, sort_column: str, include_root_id: bool = True ) -> List[Dict[str, Any]]: """Returns destination table contents as list of dictionaries.""" + + def strip_timezone(ts: datetime) -> datetime: + """Converts timezone of datetime object to UTC and removes timezone awareness.""" + return ensure_pendulum_datetime(ts).astimezone(tz=timezone.utc).replace(tzinfo=None) + return sorted( [ { @@ -84,21 +74,22 @@ def assert_records_as_set(actual: List[Dict[str, Any]], expected: List[Dict[str, @pytest.mark.essential @pytest.mark.parametrize( - "destination_config,simple,validity_column_names", - [ # test basic case for alle SQL destinations supporting merge - (dconf, True, None) + "destination_config,simple,validity_column_names,active_record_timestamp", + # test basic cases for alle SQL destinations supporting merge + [ + (dconf, True, None, None) for dconf in destinations_configs(default_sql_configs=True, supports_merge=True) ] - + [ # test nested columns and validity column name configuration only for postgres - ( - dconf, - False, - ["from", "to"], - ) # "from" is a SQL keyword, so this also tests if columns are escaped + + [ + (dconf, True, None, pendulum.DateTime(3234, 12, 31, 22, 2, 59)) # arbitrary timestamp + for dconf in destinations_configs(default_sql_configs=True, supports_merge=True) + ] + + [ # test nested columns and validity column name configuration only for postgres and duckdb + (dconf, False, ["from", "to"], None) for dconf in destinations_configs(default_sql_configs=True, subset=["postgres", "duckdb"]) ] + [ - (dconf, False, ["ValidFrom", "ValidTo"]) + (dconf, False, ["ValidFrom", "ValidTo"], None) for dconf in destinations_configs(default_sql_configs=True, subset=["postgres", "duckdb"]) ], ids=lambda x: ( @@ -111,6 +102,7 @@ def test_core_functionality( destination_config: DestinationTestConfiguration, simple: bool, validity_column_names: List[str], + active_record_timestamp: Optional[pendulum.DateTime], ) -> None: p = destination_config.setup_pipeline("abstract", full_refresh=True) @@ -120,6 +112,7 @@ def test_core_functionality( "disposition": "merge", "strategy": "scd2", "validity_column_names": validity_column_names, + "active_record_timestamp": active_record_timestamp, }, ) def r(data): @@ -153,8 +146,20 @@ def r(data): assert_load_info(info) cname = "c2" if simple else "c2__nc1" assert get_table(p, "dim_test", cname) == [ - {from_: ts_1, to: get_active_ts(p), "nk": 2, "c1": "bar", cname: "bar"}, - {from_: ts_1, to: get_active_ts(p), "nk": 1, "c1": "foo", cname: "foo"}, + { + from_: ts_1, + to: active_record_timestamp, + "nk": 2, + "c1": "bar", + cname: "bar", + }, + { + from_: ts_1, + to: active_record_timestamp, + "nk": 1, + "c1": "foo", + cname: "foo", + }, ] # load 2 โ€” update a record @@ -166,9 +171,21 @@ def r(data): ts_2 = get_load_package_created_at(p, info) assert_load_info(info) assert get_table(p, "dim_test", cname) == [ - {from_: ts_1, to: get_active_ts(p), "nk": 2, "c1": "bar", cname: "bar"}, + { + from_: ts_1, + to: active_record_timestamp, + "nk": 2, + "c1": "bar", + cname: "bar", + }, {from_: ts_1, to: ts_2, "nk": 1, "c1": "foo", cname: "foo"}, - {from_: ts_2, to: get_active_ts(p), "nk": 1, "c1": "foo", cname: "foo_updated"}, + { + from_: ts_2, + to: active_record_timestamp, + "nk": 1, + "c1": "foo", + cname: "foo_updated", + }, ] # load 3 โ€” delete a record @@ -181,7 +198,13 @@ def r(data): assert get_table(p, "dim_test", cname) == [ {from_: ts_1, to: ts_3, "nk": 2, "c1": "bar", cname: "bar"}, {from_: ts_1, to: ts_2, "nk": 1, "c1": "foo", cname: "foo"}, - {from_: ts_2, to: get_active_ts(p), "nk": 1, "c1": "foo", cname: "foo_updated"}, + { + from_: ts_2, + to: active_record_timestamp, + "nk": 1, + "c1": "foo", + cname: "foo_updated", + }, ] # load 4 โ€” insert a record @@ -194,9 +217,21 @@ def r(data): assert_load_info(info) assert get_table(p, "dim_test", cname) == [ {from_: ts_1, to: ts_3, "nk": 2, "c1": "bar", cname: "bar"}, - {from_: ts_4, to: get_active_ts(p), "nk": 3, "c1": "baz", cname: "baz"}, + { + from_: ts_4, + to: active_record_timestamp, + "nk": 3, + "c1": "baz", + cname: "baz", + }, {from_: ts_1, to: ts_2, "nk": 1, "c1": "foo", cname: "foo"}, - {from_: ts_2, to: get_active_ts(p), "nk": 1, "c1": "foo", cname: "foo_updated"}, + { + from_: ts_2, + to: active_record_timestamp, + "nk": 1, + "c1": "foo", + cname: "foo_updated", + }, ] @@ -228,8 +263,8 @@ def r(data): ts_1 = get_load_package_created_at(p, info) assert_load_info(info) assert get_table(p, "dim_test", "c1") == [ - {from_: ts_1, to: get_active_ts(p), "nk": 2, "c1": "bar"}, - {from_: ts_1, to: get_active_ts(p), "nk": 1, "c1": "foo"}, + {from_: ts_1, to: None, "nk": 2, "c1": "bar"}, + {from_: ts_1, to: None, "nk": 1, "c1": "foo"}, ] cname = "value" if simple else "cc1" assert get_table(p, "dim_test__c2", cname) == [ @@ -247,9 +282,9 @@ def r(data): ts_2 = get_load_package_created_at(p, info) assert_load_info(info) assert get_table(p, "dim_test", "c1") == [ - {from_: ts_1, to: get_active_ts(p), "nk": 2, "c1": "bar"}, + {from_: ts_1, to: None, "nk": 2, "c1": "bar"}, {from_: ts_1, to: ts_2, "nk": 1, "c1": "foo"}, # updated - {from_: ts_2, to: get_active_ts(p), "nk": 1, "c1": "foo_updated"}, # new + {from_: ts_2, to: None, "nk": 1, "c1": "foo_updated"}, # new ] assert_records_as_set( get_table(p, "dim_test__c2", cname), @@ -276,10 +311,10 @@ def r(data): assert_records_as_set( get_table(p, "dim_test", "c1"), [ - {from_: ts_1, to: get_active_ts(p), "nk": 2, "c1": "bar"}, + {from_: ts_1, to: None, "nk": 2, "c1": "bar"}, {from_: ts_1, to: ts_2, "nk": 1, "c1": "foo"}, {from_: ts_2, to: ts_3, "nk": 1, "c1": "foo_updated"}, # updated - {from_: ts_3, to: get_active_ts(p), "nk": 1, "c1": "foo_updated"}, # new + {from_: ts_3, to: None, "nk": 1, "c1": "foo_updated"}, # new ], ) exp_3 = [ @@ -305,7 +340,7 @@ def r(data): {from_: ts_1, to: ts_4, "nk": 2, "c1": "bar"}, # updated {from_: ts_1, to: ts_2, "nk": 1, "c1": "foo"}, {from_: ts_2, to: ts_3, "nk": 1, "c1": "foo_updated"}, - {from_: ts_3, to: get_active_ts(p), "nk": 1, "c1": "foo_updated"}, + {from_: ts_3, to: None, "nk": 1, "c1": "foo_updated"}, ], ) assert_records_as_set( @@ -324,10 +359,10 @@ def r(data): get_table(p, "dim_test", "c1"), [ {from_: ts_1, to: ts_4, "nk": 2, "c1": "bar"}, - {from_: ts_5, to: get_active_ts(p), "nk": 3, "c1": "baz"}, # new + {from_: ts_5, to: None, "nk": 3, "c1": "baz"}, # new {from_: ts_1, to: ts_2, "nk": 1, "c1": "foo"}, {from_: ts_2, to: ts_3, "nk": 1, "c1": "foo_updated"}, - {from_: ts_3, to: get_active_ts(p), "nk": 1, "c1": "foo_updated"}, + {from_: ts_3, to: None, "nk": 1, "c1": "foo_updated"}, ], ) assert_records_as_set( @@ -468,6 +503,48 @@ def r(data): assert isinstance(pip_ex.value.__context__.__context__, ColumnNameConflictException) +@pytest.mark.parametrize( + "destination_config", + destinations_configs(default_sql_configs=True, subset=["postgres"]), + ids=lambda x: x.name, +) +@pytest.mark.parametrize( + "active_record_timestamp", + [ + date(9999, 12, 31), + datetime(9999, 12, 31), + pendulum.Date(9999, 12, 31), + pendulum.DateTime(9999, 12, 31), + "9999-12-31", + "9999-12-31T00:00:00", + "9999-12-31T00:00:00+00:00", + "9999-12-31T00:00:00+01:00", + ], +) +def test_active_record_timestamp( + destination_config: DestinationTestConfiguration, + active_record_timestamp: Optional[TAnyDateTime], +) -> None: + p = destination_config.setup_pipeline("abstract", full_refresh=True) + + @dlt.resource( + table_name="dim_test", + write_disposition={ + "disposition": "merge", + "strategy": "scd2", + "active_record_timestamp": active_record_timestamp, + }, + ) + def r(): + yield {"foo": "bar"} + + p.run(r()) + actual_active_record_timestamp = ensure_pendulum_datetime( + load_tables_to_dicts(p, "dim_test")["dim_test"][0]["_dlt_valid_to"] + ) + assert actual_active_record_timestamp == ensure_pendulum_datetime(active_record_timestamp) + + @pytest.mark.parametrize( "destination_config", destinations_configs(default_sql_configs=True, subset=["duckdb"]), @@ -571,7 +648,7 @@ def r(data): {from_: ts_1, to: ts_2, "nk": 1, "c1": "foo", "row_hash": "mocked_hash_1"}, { from_: ts_2, - to: get_active_ts(p), + to: None, "nk": 1, "c1": "foo_upd", "row_hash": "mocked_hash_1_upd",