Skip to content

Commit

Permalink
scd2 extension: active record literal changes (#1275)
Browse files Browse the repository at this point in the history
* change default active record literal type to null and make configurable

* replace active_record_literal_type with active_record_timestamp setting

* add test for active record timestamp varieties

* add docs for scd2 active_record_timestamp setting

* fix docs

---------

Co-authored-by: Jorrit Sandbrink <[email protected]>
Co-authored-by: Dave <[email protected]>
  • Loading branch information
3 people authored Apr 25, 2024
1 parent d04f9f4 commit df6282b
Show file tree
Hide file tree
Showing 6 changed files with 176 additions and 74 deletions.
9 changes: 6 additions & 3 deletions dlt/common/schema/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@

from dlt.common.data_types import TDataType
from dlt.common.normalizers.typing import TNormalizersConfig
from dlt.common.typing import TSortOrder
from dlt.common.typing import TSortOrder, TAnyDateTime
from dlt.common.pendulum import pendulum

try:
from pydantic import BaseModel as _PydanticBaseModel
Expand All @@ -33,8 +34,6 @@
LOADS_TABLE_NAME = "_dlt_loads"
STATE_TABLE_NAME = "_dlt_pipeline_state"
DLT_NAME_PREFIX = "_dlt"
DEFAULT_VALIDITY_COLUMN_NAMES = ["_dlt_valid_from", "_dlt_valid_to"]
"""Default values for validity column names used in `scd2` merge strategy."""

TColumnProp = Literal[
"name",
Expand Down Expand Up @@ -161,6 +160,9 @@ class NormalizerInfo(TypedDict, total=True):
WRITE_DISPOSITIONS: Set[TWriteDisposition] = set(get_args(TWriteDisposition))
MERGE_STRATEGIES: Set[TLoaderMergeStrategy] = set(get_args(TLoaderMergeStrategy))

DEFAULT_VALIDITY_COLUMN_NAMES = ["_dlt_valid_from", "_dlt_valid_to"]
"""Default values for validity column names used in `scd2` merge strategy."""


class TWriteDispositionDict(TypedDict):
disposition: TWriteDisposition
Expand All @@ -169,6 +171,7 @@ class TWriteDispositionDict(TypedDict):
class TMergeDispositionDict(TWriteDispositionDict, total=False):
strategy: Optional[TLoaderMergeStrategy]
validity_column_names: Optional[List[str]]
active_record_timestamp: Optional[TAnyDateTime]
row_version_column_name: Optional[str]


Expand Down
12 changes: 11 additions & 1 deletion dlt/common/schema/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
from copy import deepcopy, copy
from typing import Dict, List, Sequence, Tuple, Type, Any, cast, Iterable, Optional, Union

from dlt.common.pendulum import pendulum
from dlt.common.time import ensure_pendulum_datetime
from dlt.common.json import json
from dlt.common.data_types import TDataType
from dlt.common.exceptions import DictValidationException
Expand Down Expand Up @@ -481,7 +483,8 @@ def get_columns_names_with_prop(
return [
c["name"]
for c in table["columns"].values()
if bool(c.get(column_prop, False)) and (include_incomplete or is_complete_column(c))
if (bool(c.get(column_prop, False)) or c.get(column_prop, False) is None)
and (include_incomplete or is_complete_column(c))
]


Expand Down Expand Up @@ -525,6 +528,13 @@ def get_validity_column_names(table: TTableSchema) -> List[Optional[str]]:
]


def get_active_record_timestamp(table: TTableSchema) -> Optional[pendulum.DateTime]:
# method assumes a column with "x-active-record-timestamp" property exists
cname = get_first_column_name_with_prop(table, "x-active-record-timestamp")
hint_val = table["columns"][cname]["x-active-record-timestamp"] # type: ignore[typeddict-item]
return None if hint_val is None else ensure_pendulum_datetime(hint_val)


def merge_schema_updates(schema_updates: Sequence[TSchemaUpdate]) -> TSchemaTables:
aggregated_update: TSchemaTables = {}
for schema_update in schema_updates:
Expand Down
19 changes: 12 additions & 7 deletions dlt/destinations/sql_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
get_first_column_name_with_prop,
get_dedup_sort_tuple,
get_validity_column_names,
get_active_record_timestamp,
DEFAULT_MERGE_STRATEGY,
)
from dlt.common.storages.load_storage import ParsedLoadJobFileName
Expand All @@ -24,10 +25,6 @@
from dlt.pipeline.current import load_package as current_load_package


HIGH_TS = pendulum.datetime(9999, 12, 31)
"""High timestamp used to indicate active records in `scd2` merge strategy."""


class SqlJobParams(TypedDict, total=False):
replace: Optional[bool]
table_chain_create_table_statements: Dict[str, Sequence[str]]
Expand Down Expand Up @@ -546,12 +543,20 @@ def gen_scd2_sql(
current_load_package()["state"]["created_at"],
caps.timestamp_precision,
)
active_record_ts = format_datetime_literal(HIGH_TS, caps.timestamp_precision)
active_record_timestamp = get_active_record_timestamp(root_table)
if active_record_timestamp is None:
active_record_literal = "NULL"
is_active_clause = f"{to} IS NULL"
else: # it's a datetime
active_record_literal = format_datetime_literal(
active_record_timestamp, caps.timestamp_precision
)
is_active_clause = f"{to} = {active_record_literal}"

# retire updated and deleted records
sql.append(f"""
UPDATE {root_table_name} SET {to} = {boundary_ts}
WHERE {to} = {active_record_ts}
WHERE {is_active_clause}
AND {hash_} NOT IN (SELECT {hash_} FROM {staging_root_table_name});
""")

Expand All @@ -560,7 +565,7 @@ def gen_scd2_sql(
col_str = ", ".join([c for c in columns if c not in (from_, to)])
sql.append(f"""
INSERT INTO {root_table_name} ({col_str}, {from_}, {to})
SELECT {col_str}, {boundary_ts} AS {from_}, {active_record_ts} AS {to}
SELECT {col_str}, {boundary_ts} AS {from_}, {active_record_literal} AS {to}
FROM {staging_root_table_name} AS s
WHERE {hash_} NOT IN (SELECT {hash_} FROM {root_table_name});
""")
Expand Down
6 changes: 2 additions & 4 deletions dlt/extract/hints.py
Original file line number Diff line number Diff line change
Expand Up @@ -443,11 +443,9 @@ def _merge_merge_disposition_dict(dict_: Dict[str, Any]) -> None:
"data_type": "timestamp",
"nullable": True,
"x-valid-to": True,
"x-active-record-timestamp": mddict.get("active_record_timestamp"),
}
if mddict.get("row_version_column_name") is None:
hash_ = "_dlt_id"
else:
hash_ = mddict["row_version_column_name"]
hash_ = mddict.get("row_version_column_name", "_dlt_id")
dict_["columns"][hash_] = {
"name": hash_,
"nullable": False,
Expand Down
35 changes: 22 additions & 13 deletions docs/website/docs/general-usage/incremental-loading.md
Original file line number Diff line number Diff line change
Expand Up @@ -242,8 +242,8 @@ In example above we enforce the root key propagation with `fb_ads.root_key = Tru
that correct data is propagated on initial `replace` load so the future `merge` load can be
executed. You can achieve the same in the decorator `@dlt.source(root_key=True)`.

### 🧪 `scd2` strategy
`dlt` can create [Slowly Changing Dimension Type 2](https://en.wikipedia.org/wiki/Slowly_changing_dimension#Type_2:_add_new_row) (SCD2) destination tables for dimension tables that change in the source. The resource is expected to provide a full extract of the source table each run. A row hash is stored in `_dlt_id` and used as surrogate key to identify source records that have been inserted, updated, or deleted. A high timestamp (9999-12-31 00:00:00.000000) is used to indicate an active record.
### `scd2` strategy
`dlt` can create [Slowly Changing Dimension Type 2](https://en.wikipedia.org/wiki/Slowly_changing_dimension#Type_2:_add_new_row) (SCD2) destination tables for dimension tables that change in the source. The resource is expected to provide a full extract of the source table each run. A row hash is stored in `_dlt_id` and used as surrogate key to identify source records that have been inserted, updated, or deleted. A `NULL` value is used by default to indicate an active record, but it's possible to use a configurable high timestamp (e.g. 9999-12-31 00:00:00.000000) instead.

#### Example: `scd2` merge strategy
```py
Expand All @@ -265,8 +265,8 @@ pipeline.run(dim_customer()) # first run — 2024-04-09 18:27:53.734235

| `_dlt_valid_from` | `_dlt_valid_to` | `customer_key` | `c1` | `c2` |
| -- | -- | -- | -- | -- |
| 2024-04-09 18:27:53.734235 | 9999-12-31 00:00:00.000000 | 1 | foo | 1 |
| 2024-04-09 18:27:53.734235 | 9999-12-31 00:00:00.000000 | 2 | bar | 2 |
| 2024-04-09 18:27:53.734235 | NULL | 1 | foo | 1 |
| 2024-04-09 18:27:53.734235 | NULL | 2 | bar | 2 |

```py
...
Expand All @@ -285,8 +285,8 @@ pipeline.run(dim_customer()) # second run — 2024-04-09 22:13:07.943703
| `_dlt_valid_from` | `_dlt_valid_to` | `customer_key` | `c1` | `c2` |
| -- | -- | -- | -- | -- |
| 2024-04-09 18:27:53.734235 | **2024-04-09 22:13:07.943703** | 1 | foo | 1 |
| 2024-04-09 18:27:53.734235 | 9999-12-31 00:00:00.000000 | 2 | bar | 2 |
| **2024-04-09 22:13:07.943703** | **9999-12-31 00:00:00.000000** | **1** | **foo_updated** | **1** |
| 2024-04-09 18:27:53.734235 | NULL | 2 | bar | 2 |
| **2024-04-09 22:13:07.943703** | **NULL** | **1** | **foo_updated** | **1** |

```py
...
Expand All @@ -305,14 +305,9 @@ pipeline.run(dim_customer()) # third run — 2024-04-10 06:45:22.847403
| -- | -- | -- | -- | -- |
| 2024-04-09 18:27:53.734235 | 2024-04-09 22:13:07.943703 | 1 | foo | 1 |
| 2024-04-09 18:27:53.734235 | **2024-04-10 06:45:22.847403** | 2 | bar | 2 |
| 2024-04-09 22:13:07.943703 | 9999-12-31 00:00:00.000000 | 1 | foo_updated | 1 |
| 2024-04-09 22:13:07.943703 | NULL | 1 | foo_updated | 1 |

:::caution
SCD2 is still work in progress. We plan to change the default **high timestamp** from `9999-12-31 00:00:00.000000` to `NULL`
and make it configurable. This feature will be released with `dlt` 0.4.10
:::

#### Example: customize validity column names
#### Example: configure validity column names
`_dlt_valid_from` and `_dlt_valid_to` are used by default as validity column names. Other names can be configured as follows:
```py
@dlt.resource(
Expand All @@ -327,6 +322,20 @@ def dim_customer():
...
```

#### Example: configure active record timestamp
You can configure the literal used to indicate an active record with `active_record_timestamp`. The default literal `NULL` is used if `active_record_timestamp` is omitted or set to `None`. Provide a date value if you prefer to use a high timestamp instead.
```py
@dlt.resource(
write_disposition={
"disposition": "merge",
"strategy": "scd2",
"active_record_timestamp": "9999-12-31", # e.g. datetime.datetime(9999, 12, 31) is also accepted
}
)
def dim_customer():
...
```

#### Example: use your own row hash
By default, `dlt` generates a row hash based on all columns provided by the resource and stores it in `_dlt_id`. You can use your own hash instead by specifying `row_version_column_name` in the `write_disposition` dictionary. You might already have a column present in your resource that can naturally serve as row hash, in which case it's more efficient to use those pre-existing hash values than to generate new artificial ones. This option also allows you to use hashes based on a subset of columns, in case you want to ignore changes in some of the columns. When using your own hash, values for `_dlt_id` are randomly generated.
```py
Expand Down
Loading

0 comments on commit df6282b

Please sign in to comment.