Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce hard_delete and dedup_sort columns hint for merge #960

Merged
merged 32 commits into from
Feb 24, 2024
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
82c3634
black formatting
Feb 12, 2024
97c5512
remove unused exception
Feb 12, 2024
400d84b
add initial support for replicate write disposition
Feb 12, 2024
24f362e
add hard_delete hint and sorted deduplication for merge
Feb 14, 2024
f3a4878
undo config change
Feb 14, 2024
deb816f
undo unintentional changes
Feb 14, 2024
4a38d56
refactor hard_delete handling and introduce dedup_sort hint
Feb 15, 2024
0d1c977
update docstring
Feb 15, 2024
474d8bc
replace dialect-specific SQL
Feb 16, 2024
568ef26
add parentheses to ensure proper clause evaluation order
Feb 16, 2024
81ea426
add escape defaults and temp tables for non-primary key case
Feb 16, 2024
a04a238
exclude destinations that don't support merge from test
Feb 17, 2024
8ac0f9c
correct typo
Feb 20, 2024
ec115e9
extend docstring
Feb 20, 2024
a1afeb8
remove redundant copies for immutable strings
Feb 20, 2024
f07205d
simplify boolean logic
Feb 20, 2024
a64580d
add more test cases for hard_delete and dedup_sort hints
Feb 20, 2024
3308549
refactor table chain resolution
Feb 21, 2024
189c2fb
marks tables that seen data in normalizer, skips empty jobs if never …
rudolfix Feb 22, 2024
a649b0e
ignores tables that didn't seen data when loading, tests edge cases
rudolfix Feb 22, 2024
9778f0e
Merge branch 'devel' into 947-core-extensions-to-support-database-rep…
rudolfix Feb 22, 2024
4b3c59b
add sort order configuration option
Feb 22, 2024
c984c4e
bumps schema engine to v9, adds migrations
rudolfix Feb 22, 2024
935748a
filters tables without data properly in load
rudolfix Feb 22, 2024
d125556
converts seen-data to boolean, fixes tests
rudolfix Feb 22, 2024
ecaf6ef
Merge branch '947-core-extensions-to-support-database-replication' of…
rudolfix Feb 22, 2024
af0b344
disables filesystem tests config due to merge present
rudolfix Feb 22, 2024
262018b
add docs for hard_delete and dedup_sort column hints
Feb 22, 2024
0814bb0
Merge branch '947-core-extensions-to-support-database-replication' of…
Feb 22, 2024
44a9ff2
fixes extending table chains in load
rudolfix Feb 23, 2024
9384148
Merge branch '947-core-extensions-to-support-database-replication' of…
rudolfix Feb 23, 2024
9921b89
refactors load and adds unit tests with dummy
rudolfix Feb 24, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 52 additions & 6 deletions dlt/common/destination/reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
Generic,
Final,
)
from contextlib import contextmanager
import datetime # noqa: 251
from copy import deepcopy
import inspect
Expand All @@ -32,18 +31,22 @@
UnknownDestinationModule,
)
from dlt.common.schema import Schema, TTableSchema, TSchemaTables
from dlt.common.schema.typing import TWriteDisposition
from dlt.common.schema.exceptions import InvalidDatasetName
from dlt.common.schema.utils import get_write_disposition, get_table_format
from dlt.common.configuration import configspec, with_config, resolve_configuration, known_sections
from dlt.common.schema.exceptions import SchemaException
from dlt.common.schema.utils import (
get_write_disposition,
get_table_format,
get_columns_names_with_prop,
has_column_with_prop,
get_first_column_name_with_prop,
)
from dlt.common.configuration import configspec, resolve_configuration, known_sections
from dlt.common.configuration.specs import BaseConfiguration, CredentialsConfiguration
from dlt.common.configuration.accessors import config
from dlt.common.destination.capabilities import DestinationCapabilitiesContext
from dlt.common.schema.utils import is_complete_column
from dlt.common.schema.exceptions import UnknownTableException
from dlt.common.storages import FileStorage
from dlt.common.storages.load_storage import ParsedLoadJobFileName
from dlt.common.utils import get_module_name
from dlt.common.configuration.specs import GcpCredentials, AwsCredentialsWithoutDefaults


Expand Down Expand Up @@ -345,6 +348,49 @@ def _verify_schema(self) -> None:
table_name,
self.capabilities.max_identifier_length,
)
if has_column_with_prop(table, "hard_delete"):
if len(get_columns_names_with_prop(table, "hard_delete")) > 1:
raise SchemaException(
f'Found multiple "hard_delete" column hints for table "{table_name}" in'
f' schema "{self.schema.name}" while only one is allowed:'
f' {", ".join(get_columns_names_with_prop(table, "hard_delete"))}.'
)
if table.get("write_disposition") in ("replace", "append"):
logger.warning(
f"""The "hard_delete" column hint for column "{get_first_column_name_with_prop(table, 'hard_delete')}" """
f'in table "{table_name}" with write disposition'
f' "{table.get("write_disposition")}"'
f' in schema "{self.schema.name}" will be ignored.'
' The "hard_delete" column hint is only applied when using'
' the "merge" write disposition.'
)
if has_column_with_prop(table, "dedup_sort"):
if len(get_columns_names_with_prop(table, "dedup_sort")) > 1:
raise SchemaException(
f'Found multiple "dedup_sort" column hints for table "{table_name}" in'
f' schema "{self.schema.name}" while only one is allowed:'
f' {", ".join(get_columns_names_with_prop(table, "dedup_sort"))}.'
)
if table.get("write_disposition") in ("replace", "append"):
logger.warning(
f"""The "dedup_sort" column hint for column "{get_first_column_name_with_prop(table, 'dedup_sort')}" """
f'in table "{table_name}" with write disposition'
f' "{table.get("write_disposition")}"'
f' in schema "{self.schema.name}" will be ignored.'
' The "dedup_sort" column hint is only applied when using'
' the "merge" write disposition.'
)
if table.get("write_disposition") == "merge" and not has_column_with_prop(
table, "primary_key"
):
logger.warning(
f"""The "dedup_sort" column hint for column "{get_first_column_name_with_prop(table, 'dedup_sort')}" """
f'in table "{table_name}" with write disposition'
f' "{table.get("write_disposition")}"'
f' in schema "{self.schema.name}" will be ignored.'
' The "dedup_sort" column hint is only applied when a'
" primary key has been specified."
)
for column_name, column in dict(table["columns"]).items():
if len(column_name) > self.capabilities.max_column_identifier_length:
raise IdentifierTooLongException(
Expand Down
3 changes: 2 additions & 1 deletion dlt/common/schema/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,7 @@ def resolve_contract_settings_for_table(
return Schema.expand_schema_contract_settings(settings)

def update_table(self, partial_table: TPartialTableSchema) -> TPartialTableSchema:
"""Adds or merges `partial_table` into the schema. Identifiers are not normalized"""
table_name = partial_table["name"]
parent_table_name = partial_table.get("parent")
# check if parent table present
Expand All @@ -414,7 +415,7 @@ def update_table(self, partial_table: TPartialTableSchema) -> TPartialTableSchem
return partial_table

def update_schema(self, schema: "Schema") -> None:
"""Updates this schema from an incoming schema"""
"""Updates this schema from an incoming schema. Normalizes identifiers after updating normalizers."""
# update all tables
for table in schema.tables.values():
self.update_table(table)
Expand Down
2 changes: 2 additions & 0 deletions dlt/common/schema/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ class TColumnSchema(TColumnSchemaBase, total=False):
root_key: Optional[bool]
merge_key: Optional[bool]
variant: Optional[bool]
hard_delete: Optional[bool]
dedup_sort: Optional[bool]


TTableSchemaColumns = Dict[str, TColumnSchema]
Expand Down
17 changes: 17 additions & 0 deletions dlt/common/schema/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,23 @@ def get_columns_names_with_prop(
]


def get_first_column_name_with_prop(
table: TTableSchema, column_prop: Union[TColumnProp, str], include_incomplete: bool = False
) -> Optional[str]:
"""Returns name of first column in `table` schema with property `column_prop` or None if no such column exists."""
column_names = get_columns_names_with_prop(table, column_prop, include_incomplete)
if len(column_names) > 0:
return column_names[0]
return None


def has_column_with_prop(
table: TTableSchema, column_prop: Union[TColumnProp, str], include_incomplete: bool = False
) -> bool:
"""Checks if `table` schema contains column with property `column_prop`."""
return len(get_columns_names_with_prop(table, column_prop, include_incomplete)) > 0


def merge_schema_updates(schema_updates: Sequence[TSchemaUpdate]) -> TSchemaTables:
aggregated_update: TSchemaTables = {}
for schema_update in schema_updates:
Expand Down
8 changes: 8 additions & 0 deletions dlt/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -575,3 +575,11 @@ def get_exception_trace_chain(
elif exc.__context__:
return get_exception_trace_chain(exc.__context__, traces, seen)
return traces


def order_deduped(lst: List[Any]) -> List[Any]:
"""Returns deduplicated list preserving order of input elements.

Only works for lists with hashable elements.
"""
return list(dict.fromkeys(lst))
20 changes: 10 additions & 10 deletions dlt/destinations/impl/athena/athena.py
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,9 @@ def _from_db_type(
return self.type_mapper.from_db_type(hive_t, precision, scale)

def _get_column_def_sql(self, c: TColumnSchema, table_format: TTableFormat = None) -> str:
return f"{self.sql_client.escape_ddl_identifier(c['name'])} {self.type_mapper.to_db_type(c, table_format)}"
return (
f"{self.sql_client.escape_ddl_identifier(c['name'])} {self.type_mapper.to_db_type(c, table_format)}"
)

def _get_table_update_sql(
self, table_name: str, new_columns: Sequence[TColumnSchema], generate_alter: bool
Expand All @@ -376,19 +378,15 @@ def _get_table_update_sql(
# use qualified table names
qualified_table_name = self.sql_client.make_qualified_ddl_table_name(table_name)
if is_iceberg and not generate_alter:
sql.append(
f"""CREATE TABLE {qualified_table_name}
sql.append(f"""CREATE TABLE {qualified_table_name}
({columns})
LOCATION '{location}'
TBLPROPERTIES ('table_type'='ICEBERG', 'format'='parquet');"""
)
TBLPROPERTIES ('table_type'='ICEBERG', 'format'='parquet');""")
elif not generate_alter:
sql.append(
f"""CREATE EXTERNAL TABLE {qualified_table_name}
sql.append(f"""CREATE EXTERNAL TABLE {qualified_table_name}
({columns})
STORED AS PARQUET
LOCATION '{location}';"""
)
LOCATION '{location}';""")
# alter table to add new columns at the end
else:
sql.append(f"""ALTER TABLE {qualified_table_name} ADD COLUMNS ({columns});""")
Expand Down Expand Up @@ -455,7 +453,9 @@ def should_load_data_to_staging_dataset_on_staging_destination(
self, table: TTableSchema
) -> bool:
"""iceberg table data goes into staging on staging destination"""
return self._is_iceberg_table(self.get_load_table(table["name"]))
if self._is_iceberg_table(self.get_load_table(table["name"])):
return True
return super().should_load_data_to_staging_dataset_on_staging_destination(table)

def get_load_table(self, table_name: str, staging: bool = False) -> TTableSchema:
table = super().get_load_table(table_name, staging)
Expand Down
10 changes: 6 additions & 4 deletions dlt/destinations/impl/bigquery/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,9 +252,9 @@ def _get_table_update_sql(
elif (c := partition_list[0])["data_type"] == "date":
sql[0] = f"{sql[0]}\nPARTITION BY {self.capabilities.escape_identifier(c['name'])}"
elif (c := partition_list[0])["data_type"] == "timestamp":
sql[
0
] = f"{sql[0]}\nPARTITION BY DATE({self.capabilities.escape_identifier(c['name'])})"
sql[0] = (
f"{sql[0]}\nPARTITION BY DATE({self.capabilities.escape_identifier(c['name'])})"
)
# Automatic partitioning of an INT64 type requires us to be prescriptive - we treat the column as a UNIX timestamp.
# This is due to the bounds requirement of GENERATE_ARRAY function for partitioning.
# The 10,000 partitions limit makes it infeasible to cover the entire `bigint` range.
Expand All @@ -272,7 +272,9 @@ def _get_table_update_sql(

def _get_column_def_sql(self, c: TColumnSchema, table_format: TTableFormat = None) -> str:
name = self.capabilities.escape_identifier(c["name"])
return f"{name} {self.type_mapper.to_db_type(c, table_format)} {self._gen_not_null(c.get('nullable', True))}"
return (
f"{name} {self.type_mapper.to_db_type(c, table_format)} {self._gen_not_null(c.get('nullable', True))}"
)

def get_storage_table(self, table_name: str) -> Tuple[bool, TTableSchemaColumns]:
schema_table: TTableSchemaColumns = {}
Expand Down
28 changes: 18 additions & 10 deletions dlt/destinations/impl/databricks/databricks.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,12 +166,14 @@ def __init__(
else:
raise LoadJobTerminalException(
file_path,
f"Databricks cannot load data from staging bucket {bucket_path}. Only s3 and azure buckets are supported",
f"Databricks cannot load data from staging bucket {bucket_path}. Only s3 and"
" azure buckets are supported",
)
else:
raise LoadJobTerminalException(
file_path,
"Cannot load from local file. Databricks does not support loading from local files. Configure staging with an s3 or azure storage bucket.",
"Cannot load from local file. Databricks does not support loading from local files."
" Configure staging with an s3 or azure storage bucket.",
)

# decide on source format, stage_file_path will either be a local file or a bucket path
Expand All @@ -181,27 +183,33 @@ def __init__(
if not config.get("data_writer.disable_compression"):
raise LoadJobTerminalException(
file_path,
"Databricks loader does not support gzip compressed JSON files. Please disable compression in the data writer configuration: https://dlthub.com/docs/reference/performance#disabling-and-enabling-file-compression",
"Databricks loader does not support gzip compressed JSON files. Please disable"
" compression in the data writer configuration:"
" https://dlthub.com/docs/reference/performance#disabling-and-enabling-file-compression",
)
if table_schema_has_type(table, "decimal"):
raise LoadJobTerminalException(
file_path,
"Databricks loader cannot load DECIMAL type columns from json files. Switch to parquet format to load decimals.",
"Databricks loader cannot load DECIMAL type columns from json files. Switch to"
" parquet format to load decimals.",
)
if table_schema_has_type(table, "binary"):
raise LoadJobTerminalException(
file_path,
"Databricks loader cannot load BINARY type columns from json files. Switch to parquet format to load byte values.",
"Databricks loader cannot load BINARY type columns from json files. Switch to"
" parquet format to load byte values.",
)
if table_schema_has_type(table, "complex"):
raise LoadJobTerminalException(
file_path,
"Databricks loader cannot load complex columns (lists and dicts) from json files. Switch to parquet format to load complex types.",
"Databricks loader cannot load complex columns (lists and dicts) from json"
" files. Switch to parquet format to load complex types.",
)
if table_schema_has_type(table, "date"):
raise LoadJobTerminalException(
file_path,
"Databricks loader cannot load DATE type columns from json files. Switch to parquet format to load dates.",
"Databricks loader cannot load DATE type columns from json files. Switch to"
" parquet format to load dates.",
)

source_format = "JSON"
Expand Down Expand Up @@ -311,7 +319,7 @@ def _get_column_def_sql(self, c: TColumnSchema, table_format: TTableFormat = Non

def _get_storage_table_query_columns(self) -> List[str]:
fields = super()._get_storage_table_query_columns()
fields[
1
] = "full_data_type" # Override because this is the only way to get data type with precision
fields[1] = ( # Override because this is the only way to get data type with precision
"full_data_type"
)
return fields
6 changes: 2 additions & 4 deletions dlt/destinations/impl/snowflake/snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,15 +175,13 @@ def __init__(
f'PUT file://{file_path} @{stage_name}/"{load_id}" OVERWRITE = TRUE,'
" AUTO_COMPRESS = FALSE"
)
client.execute_sql(
f"""COPY INTO {qualified_table_name}
client.execute_sql(f"""COPY INTO {qualified_table_name}
{from_clause}
{files_clause}
{credentials_clause}
FILE_FORMAT = {source_format}
MATCH_BY_COLUMN_NAME='CASE_INSENSITIVE'
"""
)
""")
if stage_file_path and not keep_staged_files:
client.execute_sql(f"REMOVE {stage_file_path}")

Expand Down
Loading