Skip to content

Commit

Permalink
Merge branch 'devel' into 1055-implement-clickhouse-destination
Browse files Browse the repository at this point in the history
# Conflicts:
#	dlt/destinations/sql_jobs.py
#	tests/load/pipeline/test_scd2.py
  • Loading branch information
sh-rp committed Apr 25, 2024
2 parents 7627945 + 25a3321 commit 4d5db26
Show file tree
Hide file tree
Showing 11 changed files with 318 additions and 96 deletions.
9 changes: 6 additions & 3 deletions dlt/common/schema/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@

from dlt.common.data_types import TDataType
from dlt.common.normalizers.typing import TNormalizersConfig
from dlt.common.typing import TSortOrder
from dlt.common.typing import TSortOrder, TAnyDateTime
from dlt.common.pendulum import pendulum

try:
from pydantic import BaseModel as _PydanticBaseModel
Expand All @@ -33,8 +34,6 @@
LOADS_TABLE_NAME = "_dlt_loads"
STATE_TABLE_NAME = "_dlt_pipeline_state"
DLT_NAME_PREFIX = "_dlt"
DEFAULT_VALIDITY_COLUMN_NAMES = ["_dlt_valid_from", "_dlt_valid_to"]
"""Default values for validity column names used in `scd2` merge strategy."""

TColumnProp = Literal[
"name",
Expand Down Expand Up @@ -161,6 +160,9 @@ class NormalizerInfo(TypedDict, total=True):
WRITE_DISPOSITIONS: Set[TWriteDisposition] = set(get_args(TWriteDisposition))
MERGE_STRATEGIES: Set[TLoaderMergeStrategy] = set(get_args(TLoaderMergeStrategy))

DEFAULT_VALIDITY_COLUMN_NAMES = ["_dlt_valid_from", "_dlt_valid_to"]
"""Default values for validity column names used in `scd2` merge strategy."""


class TWriteDispositionDict(TypedDict):
disposition: TWriteDisposition
Expand All @@ -169,6 +171,7 @@ class TWriteDispositionDict(TypedDict):
class TMergeDispositionDict(TWriteDispositionDict, total=False):
strategy: Optional[TLoaderMergeStrategy]
validity_column_names: Optional[List[str]]
active_record_timestamp: Optional[TAnyDateTime]
row_version_column_name: Optional[str]


Expand Down
12 changes: 11 additions & 1 deletion dlt/common/schema/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
from copy import deepcopy, copy
from typing import Dict, List, Sequence, Tuple, Type, Any, cast, Iterable, Optional, Union

from dlt.common.pendulum import pendulum
from dlt.common.time import ensure_pendulum_datetime
from dlt.common.json import json
from dlt.common.data_types import TDataType
from dlt.common.exceptions import DictValidationException
Expand Down Expand Up @@ -481,7 +483,8 @@ def get_columns_names_with_prop(
return [
c["name"]
for c in table["columns"].values()
if bool(c.get(column_prop, False)) and (include_incomplete or is_complete_column(c))
if (bool(c.get(column_prop, False)) or c.get(column_prop, False) is None)
and (include_incomplete or is_complete_column(c))
]


Expand Down Expand Up @@ -525,6 +528,13 @@ def get_validity_column_names(table: TTableSchema) -> List[Optional[str]]:
]


def get_active_record_timestamp(table: TTableSchema) -> Optional[pendulum.DateTime]:
# method assumes a column with "x-active-record-timestamp" property exists
cname = get_first_column_name_with_prop(table, "x-active-record-timestamp")
hint_val = table["columns"][cname]["x-active-record-timestamp"] # type: ignore[typeddict-item]
return None if hint_val is None else ensure_pendulum_datetime(hint_val)


def merge_schema_updates(schema_updates: Sequence[TSchemaUpdate]) -> TSchemaTables:
aggregated_update: TSchemaTables = {}
for schema_update in schema_updates:
Expand Down
17 changes: 12 additions & 5 deletions dlt/destinations/sql_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
get_first_column_name_with_prop,
get_dedup_sort_tuple,
get_validity_column_names,
get_active_record_timestamp,
DEFAULT_MERGE_STRATEGY,
)
from dlt.common.storages.load_storage import ParsedLoadJobFileName
Expand Down Expand Up @@ -541,14 +542,20 @@ def gen_scd2_sql(
current_load_package()["state"]["created_at"],
caps.timestamp_precision,
)
active_record_ts = format_datetime_literal(
caps.scd2_high_timestamp, caps.timestamp_precision
)
active_record_timestamp = get_active_record_timestamp(root_table)
if active_record_timestamp is None:
active_record_literal = "NULL"
is_active_clause = f"{to} IS NULL"
else: # it's a datetime
active_record_literal = format_datetime_literal(
active_record_timestamp, caps.timestamp_precision
)
is_active_clause = f"{to} = {active_record_literal}"

# retire updated and deleted records
sql.append(f"""
{cls.gen_update_table_prefix(root_table_name)} {to} = {boundary_ts}
WHERE {to} = {active_record_ts}
WHERE {is_active_clause}
AND {hash_} NOT IN (SELECT {hash_} FROM {staging_root_table_name});
""")

Expand All @@ -557,7 +564,7 @@ def gen_scd2_sql(
col_str = ", ".join([c for c in columns if c not in (from_, to)])
sql.append(f"""
INSERT INTO {root_table_name} ({col_str}, {from_}, {to})
SELECT {col_str}, {boundary_ts} AS {from_}, {active_record_ts} AS {to}
SELECT {col_str}, {boundary_ts} AS {from_}, {active_record_literal} AS {to}
FROM {staging_root_table_name} AS s
WHERE {hash_} NOT IN (SELECT {hash_} FROM {root_table_name});
""")
Expand Down
6 changes: 2 additions & 4 deletions dlt/extract/hints.py
Original file line number Diff line number Diff line change
Expand Up @@ -443,11 +443,9 @@ def _merge_merge_disposition_dict(dict_: Dict[str, Any]) -> None:
"data_type": "timestamp",
"nullable": True,
"x-valid-to": True,
"x-active-record-timestamp": mddict.get("active_record_timestamp"),
}
if mddict.get("row_version_column_name") is None:
hash_ = "_dlt_id"
else:
hash_ = mddict["row_version_column_name"]
hash_ = mddict.get("row_version_column_name", "_dlt_id")
dict_["columns"][hash_] = {
"name": hash_,
"nullable": False,
Expand Down
43 changes: 24 additions & 19 deletions dlt/sources/helpers/rest_client/paginators.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
class BasePaginator(ABC):
def __init__(self) -> None:
self._has_next_page = True
self._next_reference: Optional[str] = None

@property
def has_next_page(self) -> bool:
Expand All @@ -21,15 +20,6 @@ def has_next_page(self) -> bool:
"""
return self._has_next_page

@property
def next_reference(self) -> Optional[str]:
return self._next_reference

@next_reference.setter
def next_reference(self, value: Optional[str]) -> None:
self._next_reference = value
self._has_next_page = value is not None

@abstractmethod
def update_state(self, response: Response) -> None:
"""Update the paginator state based on the response.
Expand Down Expand Up @@ -107,15 +97,30 @@ def update_request(self, request: Request) -> None:
request.params[self.limit_param] = self.limit


class BaseNextUrlPaginator(BasePaginator):
class BaseReferencePaginator(BasePaginator):
def __init__(self) -> None:
super().__init__()
self.__next_reference: Optional[str] = None

@property
def _next_reference(self) -> Optional[str]:
return self.__next_reference

@_next_reference.setter
def _next_reference(self, value: Optional[str]) -> None:
self.__next_reference = value
self._has_next_page = value is not None


class BaseNextUrlPaginator(BaseReferencePaginator):
def update_request(self, request: Request) -> None:
# Handle relative URLs
if self.next_reference:
parsed_url = urlparse(self.next_reference)
if self._next_reference:
parsed_url = urlparse(self._next_reference)
if not parsed_url.scheme:
self.next_reference = urljoin(request.url, self.next_reference)
self._next_reference = urljoin(request.url, self._next_reference)

request.url = self.next_reference
request.url = self._next_reference


class HeaderLinkPaginator(BaseNextUrlPaginator):
Expand All @@ -136,7 +141,7 @@ def __init__(self, links_next_key: str = "next") -> None:
self.links_next_key = links_next_key

def update_state(self, response: Response) -> None:
self.next_reference = response.links.get(self.links_next_key, {}).get("url")
self._next_reference = response.links.get(self.links_next_key, {}).get("url")


class JSONResponsePaginator(BaseNextUrlPaginator):
Expand All @@ -158,10 +163,10 @@ def __init__(

def update_state(self, response: Response) -> None:
values = jsonpath.find_values(self.next_url_path, response.json())
self.next_reference = values[0] if values else None
self._next_reference = values[0] if values else None


class JSONResponseCursorPaginator(BasePaginator):
class JSONResponseCursorPaginator(BaseReferencePaginator):
"""A paginator that uses a cursor query param to paginate. The cursor for the
next page is found in the JSON response.
"""
Expand All @@ -182,7 +187,7 @@ def __init__(

def update_state(self, response: Response) -> None:
values = jsonpath.find_values(self.cursor_path, response.json())
self.next_reference = values[0] if values else None
self._next_reference = values[0] if values else None

def update_request(self, request: Request) -> None:
if request.params is None:
Expand Down
35 changes: 22 additions & 13 deletions docs/website/docs/general-usage/incremental-loading.md
Original file line number Diff line number Diff line change
Expand Up @@ -242,8 +242,8 @@ In example above we enforce the root key propagation with `fb_ads.root_key = Tru
that correct data is propagated on initial `replace` load so the future `merge` load can be
executed. You can achieve the same in the decorator `@dlt.source(root_key=True)`.

### 🧪 `scd2` strategy
`dlt` can create [Slowly Changing Dimension Type 2](https://en.wikipedia.org/wiki/Slowly_changing_dimension#Type_2:_add_new_row) (SCD2) destination tables for dimension tables that change in the source. The resource is expected to provide a full extract of the source table each run. A row hash is stored in `_dlt_id` and used as surrogate key to identify source records that have been inserted, updated, or deleted. A high timestamp (9999-12-31 00:00:00.000000) is used to indicate an active record.
### `scd2` strategy
`dlt` can create [Slowly Changing Dimension Type 2](https://en.wikipedia.org/wiki/Slowly_changing_dimension#Type_2:_add_new_row) (SCD2) destination tables for dimension tables that change in the source. The resource is expected to provide a full extract of the source table each run. A row hash is stored in `_dlt_id` and used as surrogate key to identify source records that have been inserted, updated, or deleted. A `NULL` value is used by default to indicate an active record, but it's possible to use a configurable high timestamp (e.g. 9999-12-31 00:00:00.000000) instead.

#### Example: `scd2` merge strategy
```py
Expand All @@ -265,8 +265,8 @@ pipeline.run(dim_customer()) # first run — 2024-04-09 18:27:53.734235

| `_dlt_valid_from` | `_dlt_valid_to` | `customer_key` | `c1` | `c2` |
| -- | -- | -- | -- | -- |
| 2024-04-09 18:27:53.734235 | 9999-12-31 00:00:00.000000 | 1 | foo | 1 |
| 2024-04-09 18:27:53.734235 | 9999-12-31 00:00:00.000000 | 2 | bar | 2 |
| 2024-04-09 18:27:53.734235 | NULL | 1 | foo | 1 |
| 2024-04-09 18:27:53.734235 | NULL | 2 | bar | 2 |

```py
...
Expand All @@ -285,8 +285,8 @@ pipeline.run(dim_customer()) # second run — 2024-04-09 22:13:07.943703
| `_dlt_valid_from` | `_dlt_valid_to` | `customer_key` | `c1` | `c2` |
| -- | -- | -- | -- | -- |
| 2024-04-09 18:27:53.734235 | **2024-04-09 22:13:07.943703** | 1 | foo | 1 |
| 2024-04-09 18:27:53.734235 | 9999-12-31 00:00:00.000000 | 2 | bar | 2 |
| **2024-04-09 22:13:07.943703** | **9999-12-31 00:00:00.000000** | **1** | **foo_updated** | **1** |
| 2024-04-09 18:27:53.734235 | NULL | 2 | bar | 2 |
| **2024-04-09 22:13:07.943703** | **NULL** | **1** | **foo_updated** | **1** |

```py
...
Expand All @@ -305,14 +305,9 @@ pipeline.run(dim_customer()) # third run — 2024-04-10 06:45:22.847403
| -- | -- | -- | -- | -- |
| 2024-04-09 18:27:53.734235 | 2024-04-09 22:13:07.943703 | 1 | foo | 1 |
| 2024-04-09 18:27:53.734235 | **2024-04-10 06:45:22.847403** | 2 | bar | 2 |
| 2024-04-09 22:13:07.943703 | 9999-12-31 00:00:00.000000 | 1 | foo_updated | 1 |
| 2024-04-09 22:13:07.943703 | NULL | 1 | foo_updated | 1 |

:::caution
SCD2 is still work in progress. We plan to change the default **high timestamp** from `9999-12-31 00:00:00.000000` to `NULL`
and make it configurable. This feature will be released with `dlt` 0.4.10
:::

#### Example: customize validity column names
#### Example: configure validity column names
`_dlt_valid_from` and `_dlt_valid_to` are used by default as validity column names. Other names can be configured as follows:
```py
@dlt.resource(
Expand All @@ -327,6 +322,20 @@ def dim_customer():
...
```

#### Example: configure active record timestamp
You can configure the literal used to indicate an active record with `active_record_timestamp`. The default literal `NULL` is used if `active_record_timestamp` is omitted or set to `None`. Provide a date value if you prefer to use a high timestamp instead.
```py
@dlt.resource(
write_disposition={
"disposition": "merge",
"strategy": "scd2",
"active_record_timestamp": "9999-12-31", # e.g. datetime.datetime(9999, 12, 31) is also accepted
}
)
def dim_customer():
...
```

#### Example: use your own row hash
By default, `dlt` generates a row hash based on all columns provided by the resource and stores it in `_dlt_id`. You can use your own hash instead by specifying `row_version_column_name` in the `write_disposition` dictionary. You might already have a column present in your resource that can naturally serve as row hash, in which case it's more efficient to use those pre-existing hash values than to generate new artificial ones. This option also allows you to use hashes based on a subset of columns, in case you want to ignore changes in some of the columns. When using your own hash, values for `_dlt_id` are randomly generated.
```py
Expand Down
111 changes: 111 additions & 0 deletions docs/website/docs/reference/frequently-asked-questions.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
---
title: Frequently Asked Questions
description: Questions asked frequently by users in technical help or github issues
keywords: [faq, usage information, technical help]
---


## Can I configure different nesting levels for each resource?

Currently, configuring different nesting levels for each resource directly is not supported, but there's an open GitHub issue ([#945](https://github.com/dlt-hub/dlt/issues/945)) addressing this. Meanwhile, you can use these workarounds.

Resources can be separated based on nesting needs, for example, separate the execution of pipelines for resources based on their required maximum table nesting levels.

**For resources that don't require nesting (resource1, resource2), configure nesting as:**

```py
source_data = my_source.with_resources("resource1", "resource2")
source_data.max_table_nesting = 0
load_info = pipeline.run(source_data)
```

**For resources that require deeper nesting (resource3, resource4), configure nesting as:**

```py
source_data = my_source.with_resources("resource3", "resource4")
source_data.max_table_nesting = 2
load_info = pipeline.run(source_data)
```

**Apply hints for complex columns**
If certain columns should not be normalized, you can mark them as `complex`. This can be done in two ways.

1. When fetching the source data.
```py
source_data = my_source()
source_data.resource3.apply_hints(
columns={
"column_name": {
"data_type": "complex"
}
}
)
```

1. During resource definition.
```py
@dlt.resource(columns={"column_name": {"data_type": "complex"}})
def my_resource():
# Function body goes here
pass
```
In this scenario, the specified column (column_name) will not be broken down into nested tables, regardless of the data structure.

These methods allow for a degree of customization in handling data structure and loading processes, serving as interim solutions until a direct feature is developed.

## Can I configure dlt to load data in chunks of 10,000 records for more efficient processing, and how does this affect data resumption and retries in case of failures?

`dlt` buffers to disk, and has built-in resume and retry mechanisms. This makes it less beneficial to manually manage atomicity after the fact unless you're running serverless. If you choose to load every 10k records instead, you could potentially see benefits like quicker data arrival if you're actively reading, and easier resumption from the last loaded point in case of failure, assuming that state is well-managed and records are sorted.

It's worth noting that `dlt` includes a request library replacement with [built-in retries](../../docs/reference/performance#using-the-built-in-requests-client). This means if you pull 10 million records individually, your data should remain safe even in the face of network issues. To resume jobs after a failure, however, it's necessary to run the pipeline in its own virtual machine (VM). Ephemeral storage solutions like Cloud Run don't support job resumption.

## How to contribute a verified source?

To submit your source code to our verified source repository, please refer to [this guide](https://github.com/dlt-hub/verified-sources/blob/master/CONTRIBUTING.md).

## If you don't have the source required listed in verified sources?

In case you don't have the source required listed in the [verified sources](../../docs/dlt-ecosystem/verified-sources/), you could create your own pipeline by referring to the [following docs](../../docs/walkthroughs/create-a-pipeline).

## How can I retrieve the complete schema of a `dlt` source?

To retrieve the complete schema of a `dlt` source as `dlt` recognizes it, you need to execute both the extract and normalization steps. This process can be conducted on a small sample size to avoid processing large volumes of data. You can limit the sample size by applying an `add_limit` to your resources. Here is a sample code snippet demonstrating this process:

```py
# Initialize a new DLT pipeline with specified properties
p = dlt.pipeline(
pipeline_name="my_pipeline",
destination="duckdb",
dataset_name="my_dataset"
)

# Extract data using the predefined source `my_source`
p.extract(my_source().add_limit(10))

# Normalize the data structure for consistency
p.normalize()

# Print the default schema of the pipeline in pretty YAML format for review
print(p.default_schema.to_pretty_yaml())
```

This method ensures you obtain the full schema details, including all columns, as seen by `dlt`, without needing a predefined contract on these resources.

## Is truncating or deleting a staging table safe?

You can safely truncate those or even drop the whole staging dataset. However, it will have to be recreated on the next load and might incur extra loading time or cost.
You can also delete it with Python using [Bigquery client.](https://cloud.google.com/bigquery/docs/samples/bigquery-delete-dataset#bigquery_delete_dataset-python)

## How can I develop a "custom" pagination tracker?

You can use `dlt.sources.incremental` to create a custom cursor for tracking pagination in data streams that lack a specific cursor field. An example can be found in the [Incremental loading with a cursor](https://deploy-preview-1204--dlt-hub-docs.netlify.app/docs/general-usage/incremental-loading#incremental-loading-with-a-cursor-field).

Alternatively, you can manage the state directly in Python. You can access and modify the state like a standard Python dictionary:
```py
state = dlt.current.resource_state()
state["your_custom_key"] = "your_value"
```
This method allows you to create custom pagination logic based on your requirements. An example of using `resource_state()` for pagination can be found [here](https://dlthub.com/docs/general-usage/incremental-loading#custom-incremental-loading-with-pipeline-state).

However, be cautious about overusing the state dictionary, especially in cases involving substreams for each user, as it might become unwieldy. A better strategy might involve tracking users incrementally. Then, upon updates, you only refresh the affected users' substreams entirely. This consideration helps maintain efficiency and manageability in your custom pagination implementation.

Loading

0 comments on commit 4d5db26

Please sign in to comment.