Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce hard_delete and dedup_sort columns hint for merge #960

Merged
merged 32 commits into from
Feb 24, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
82c3634
black formatting
Feb 12, 2024
97c5512
remove unused exception
Feb 12, 2024
400d84b
add initial support for replicate write disposition
Feb 12, 2024
24f362e
add hard_delete hint and sorted deduplication for merge
Feb 14, 2024
f3a4878
undo config change
Feb 14, 2024
deb816f
undo unintentional changes
Feb 14, 2024
4a38d56
refactor hard_delete handling and introduce dedup_sort hint
Feb 15, 2024
0d1c977
update docstring
Feb 15, 2024
474d8bc
replace dialect-specific SQL
Feb 16, 2024
568ef26
add parentheses to ensure proper clause evaluation order
Feb 16, 2024
81ea426
add escape defaults and temp tables for non-primary key case
Feb 16, 2024
a04a238
exclude destinations that don't support merge from test
Feb 17, 2024
8ac0f9c
correct typo
Feb 20, 2024
ec115e9
extend docstring
Feb 20, 2024
a1afeb8
remove redundant copies for immutable strings
Feb 20, 2024
f07205d
simplify boolean logic
Feb 20, 2024
a64580d
add more test cases for hard_delete and dedup_sort hints
Feb 20, 2024
3308549
refactor table chain resolution
Feb 21, 2024
189c2fb
marks tables that seen data in normalizer, skips empty jobs if never …
rudolfix Feb 22, 2024
a649b0e
ignores tables that didn't seen data when loading, tests edge cases
rudolfix Feb 22, 2024
9778f0e
Merge branch 'devel' into 947-core-extensions-to-support-database-rep…
rudolfix Feb 22, 2024
4b3c59b
add sort order configuration option
Feb 22, 2024
c984c4e
bumps schema engine to v9, adds migrations
rudolfix Feb 22, 2024
935748a
filters tables without data properly in load
rudolfix Feb 22, 2024
d125556
converts seen-data to boolean, fixes tests
rudolfix Feb 22, 2024
ecaf6ef
Merge branch '947-core-extensions-to-support-database-replication' of…
rudolfix Feb 22, 2024
af0b344
disables filesystem tests config due to merge present
rudolfix Feb 22, 2024
262018b
add docs for hard_delete and dedup_sort column hints
Feb 22, 2024
0814bb0
Merge branch '947-core-extensions-to-support-database-replication' of…
Feb 22, 2024
44a9ff2
fixes extending table chains in load
rudolfix Feb 23, 2024
9384148
Merge branch '947-core-extensions-to-support-database-replication' of…
rudolfix Feb 23, 2024
9921b89
refactors load and adds unit tests with dummy
rudolfix Feb 24, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 34 additions & 1 deletion dlt/common/schema/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
"merge_key",
]
"""Known hints of a column used to declare hint regexes."""
TWriteDisposition = Literal["skip", "append", "replace", "merge"]
TWriteDisposition = Literal["skip", "append", "replace", "merge", "replicate"]
TTableFormat = Literal["iceberg"]
TTypeDetections = Literal[
"timestamp", "iso_timestamp", "iso_date", "large_integer", "hexbytes_to_text", "wei_to_double"
Expand Down Expand Up @@ -150,6 +150,38 @@ class NormalizerInfo(TypedDict, total=True):
new_table: bool


class TCdcOperationMapperStr(TypedDict, total=True):
"""
Dictionary that informs dlt which string literals are used
in the change data to identify inserts, updates, and deletes.
"""

insert: str
update: str
delete: str


class TCdcOperationMapperInt(TypedDict, total=True):
"""
Dictionary that informs dlt which integer literals are used
in the change data to identify inserts, updates, and deletes.
"""

insert: int
update: int
delete: int


class TCdcConfig(TypedDict, total=True):
"""Dictionary that informs dlt how change data is organized."""

operation_column: str
"""Name of the column containing the operation type ("insert", "update", or "delete") for the change record."""
operation_mapper: Union[TCdcOperationMapperStr, TCdcOperationMapperInt]
sequence_column: str
"""Name of the column containing a sequence identifier that can be used to order the change records."""


# TypedDict that defines properties of a table


Expand All @@ -166,6 +198,7 @@ class TTableSchema(TypedDict, total=False):
columns: TTableSchemaColumns
resource: Optional[str]
table_format: Optional[TTableFormat]
cdc_config: Optional[TCdcConfig]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is one way to go. but IMO a better way would be to define a column level hint.
cdc_op which could be integer or single char (u/d/i)

do we really need a sequence? if so we could reuse sort or add a new hint ie. cdc_seq. There are helper methods to find column(s) with hints

it looks simpler to me.



class TPartialTableSchema(TTableSchema):
Expand Down
17 changes: 17 additions & 0 deletions dlt/common/schema/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
TTypeDetections,
TWriteDisposition,
TSchemaContract,
TCdcConfig,
)
from dlt.common.schema.exceptions import (
CannotCoerceColumnException,
Expand Down Expand Up @@ -317,6 +318,19 @@ def validate_stored_schema(stored_schema: TStoredSchema) -> None:
if parent_table_name not in stored_schema["tables"]:
raise ParentTableNotFoundException(table_name, parent_table_name)

# check for "replicate" tables that miss a primary key or "cdc_config"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this makes sense but we should move it to

  1. end of normalize stage OR
  2. beginning of load stage
    at this moment schema may be still partial. not all columns may be present (100% after extract stage)

also we should check merge disposition.

also take a look at _verify_schema in JobClientBase looks like our place :)

if table.get("write_disposition") == "replicate":
if len(get_columns_names_with_prop(table, "primary_key", True)) == 0:
raise SchemaException(
f'Primary key missing for table "{table_name}" with "replicate" write'
" disposition."
)
if "cdc_config" not in table:
raise SchemaException(
f'"cdc_config" missing for table "{table_name}" with "replicate" write'
" disposition."
)


def migrate_schema(schema_dict: DictStrAny, from_engine: int, to_engine: int) -> TStoredSchema:
if from_engine == to_engine:
Expand Down Expand Up @@ -724,6 +738,7 @@ def new_table(
resource: str = None,
schema_contract: TSchemaContract = None,
table_format: TTableFormat = None,
cdc_config: TCdcConfig = None,
) -> TTableSchema:
table: TTableSchema = {
"name": table_name,
Expand All @@ -742,6 +757,8 @@ def new_table(
table["schema_contract"] = schema_contract
if table_format:
table["table_format"] = table_format
if cdc_config is not None:
table["cdc_config"] = cdc_config
if validate_schema:
validate_dict_ignoring_xkeys(
spec=TColumnSchema,
Expand Down
16 changes: 7 additions & 9 deletions dlt/destinations/impl/athena/athena.py
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,9 @@ def _from_db_type(
return self.type_mapper.from_db_type(hive_t, precision, scale)

def _get_column_def_sql(self, c: TColumnSchema, table_format: TTableFormat = None) -> str:
return f"{self.sql_client.escape_ddl_identifier(c['name'])} {self.type_mapper.to_db_type(c, table_format)}"
return (
f"{self.sql_client.escape_ddl_identifier(c['name'])} {self.type_mapper.to_db_type(c, table_format)}"
)

def _get_table_update_sql(
self, table_name: str, new_columns: Sequence[TColumnSchema], generate_alter: bool
Expand All @@ -376,19 +378,15 @@ def _get_table_update_sql(
# use qualified table names
qualified_table_name = self.sql_client.make_qualified_ddl_table_name(table_name)
if is_iceberg and not generate_alter:
sql.append(
f"""CREATE TABLE {qualified_table_name}
sql.append(f"""CREATE TABLE {qualified_table_name}
({columns})
LOCATION '{location}'
TBLPROPERTIES ('table_type'='ICEBERG', 'format'='parquet');"""
)
TBLPROPERTIES ('table_type'='ICEBERG', 'format'='parquet');""")
elif not generate_alter:
sql.append(
f"""CREATE EXTERNAL TABLE {qualified_table_name}
sql.append(f"""CREATE EXTERNAL TABLE {qualified_table_name}
({columns})
STORED AS PARQUET
LOCATION '{location}';"""
)
LOCATION '{location}';""")
# alter table to add new columns at the end
else:
sql.append(f"""ALTER TABLE {qualified_table_name} ADD COLUMNS ({columns});""")
Expand Down
10 changes: 6 additions & 4 deletions dlt/destinations/impl/bigquery/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,9 +252,9 @@ def _get_table_update_sql(
elif (c := partition_list[0])["data_type"] == "date":
sql[0] = f"{sql[0]}\nPARTITION BY {self.capabilities.escape_identifier(c['name'])}"
elif (c := partition_list[0])["data_type"] == "timestamp":
sql[
0
] = f"{sql[0]}\nPARTITION BY DATE({self.capabilities.escape_identifier(c['name'])})"
sql[0] = (
f"{sql[0]}\nPARTITION BY DATE({self.capabilities.escape_identifier(c['name'])})"
)
# Automatic partitioning of an INT64 type requires us to be prescriptive - we treat the column as a UNIX timestamp.
# This is due to the bounds requirement of GENERATE_ARRAY function for partitioning.
# The 10,000 partitions limit makes it infeasible to cover the entire `bigint` range.
Expand All @@ -272,7 +272,9 @@ def _get_table_update_sql(

def _get_column_def_sql(self, c: TColumnSchema, table_format: TTableFormat = None) -> str:
name = self.capabilities.escape_identifier(c["name"])
return f"{name} {self.type_mapper.to_db_type(c, table_format)} {self._gen_not_null(c.get('nullable', True))}"
return (
f"{name} {self.type_mapper.to_db_type(c, table_format)} {self._gen_not_null(c.get('nullable', True))}"
)

def get_storage_table(self, table_name: str) -> Tuple[bool, TTableSchemaColumns]:
schema_table: TTableSchemaColumns = {}
Expand Down
28 changes: 18 additions & 10 deletions dlt/destinations/impl/databricks/databricks.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,12 +166,14 @@ def __init__(
else:
raise LoadJobTerminalException(
file_path,
f"Databricks cannot load data from staging bucket {bucket_path}. Only s3 and azure buckets are supported",
f"Databricks cannot load data from staging bucket {bucket_path}. Only s3 and"
" azure buckets are supported",
)
else:
raise LoadJobTerminalException(
file_path,
"Cannot load from local file. Databricks does not support loading from local files. Configure staging with an s3 or azure storage bucket.",
"Cannot load from local file. Databricks does not support loading from local files."
" Configure staging with an s3 or azure storage bucket.",
)

# decide on source format, stage_file_path will either be a local file or a bucket path
Expand All @@ -181,27 +183,33 @@ def __init__(
if not config.get("data_writer.disable_compression"):
raise LoadJobTerminalException(
file_path,
"Databricks loader does not support gzip compressed JSON files. Please disable compression in the data writer configuration: https://dlthub.com/docs/reference/performance#disabling-and-enabling-file-compression",
"Databricks loader does not support gzip compressed JSON files. Please disable"
" compression in the data writer configuration:"
" https://dlthub.com/docs/reference/performance#disabling-and-enabling-file-compression",
)
if table_schema_has_type(table, "decimal"):
raise LoadJobTerminalException(
file_path,
"Databricks loader cannot load DECIMAL type columns from json files. Switch to parquet format to load decimals.",
"Databricks loader cannot load DECIMAL type columns from json files. Switch to"
" parquet format to load decimals.",
)
if table_schema_has_type(table, "binary"):
raise LoadJobTerminalException(
file_path,
"Databricks loader cannot load BINARY type columns from json files. Switch to parquet format to load byte values.",
"Databricks loader cannot load BINARY type columns from json files. Switch to"
" parquet format to load byte values.",
)
if table_schema_has_type(table, "complex"):
raise LoadJobTerminalException(
file_path,
"Databricks loader cannot load complex columns (lists and dicts) from json files. Switch to parquet format to load complex types.",
"Databricks loader cannot load complex columns (lists and dicts) from json"
" files. Switch to parquet format to load complex types.",
)
if table_schema_has_type(table, "date"):
raise LoadJobTerminalException(
file_path,
"Databricks loader cannot load DATE type columns from json files. Switch to parquet format to load dates.",
"Databricks loader cannot load DATE type columns from json files. Switch to"
" parquet format to load dates.",
)

source_format = "JSON"
Expand Down Expand Up @@ -311,7 +319,7 @@ def _get_column_def_sql(self, c: TColumnSchema, table_format: TTableFormat = Non

def _get_storage_table_query_columns(self) -> List[str]:
fields = super()._get_storage_table_query_columns()
fields[
1
] = "full_data_type" # Override because this is the only way to get data type with precision
fields[1] = ( # Override because this is the only way to get data type with precision
"full_data_type"
)
return fields
6 changes: 2 additions & 4 deletions dlt/destinations/impl/snowflake/snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,15 +175,13 @@ def __init__(
f'PUT file://{file_path} @{stage_name}/"{load_id}" OVERWRITE = TRUE,'
" AUTO_COMPRESS = FALSE"
)
client.execute_sql(
f"""COPY INTO {qualified_table_name}
client.execute_sql(f"""COPY INTO {qualified_table_name}
{from_clause}
{files_clause}
{credentials_clause}
FILE_FORMAT = {source_format}
MATCH_BY_COLUMN_NAME='CASE_INSENSITIVE'
"""
)
""")
if stage_file_path and not keep_staged_files:
client.execute_sql(f"REMOVE {stage_file_path}")

Expand Down
18 changes: 16 additions & 2 deletions dlt/destinations/job_client_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ def create_table_chain_completed_followup_jobs(
write_disposition = table_chain[0]["write_disposition"]
if write_disposition == "append":
jobs.extend(self._create_append_followup_jobs(table_chain))
elif write_disposition == "merge":
elif write_disposition in ("merge", "replicate"):
jobs.extend(self._create_merge_followup_jobs(table_chain))
elif write_disposition == "replace":
jobs.extend(self._create_replace_followup_jobs(table_chain))
Expand Down Expand Up @@ -581,10 +581,24 @@ def with_staging_dataset(self) -> Iterator["SqlJobClientBase"]:
self.in_staging_mode = False

def should_load_data_to_staging_dataset(self, table: TTableSchema) -> bool:
if table["write_disposition"] == "merge":
if table["write_disposition"] in ("merge", "replicate"):
return True
elif table["write_disposition"] == "replace" and (
self.config.replace_strategy in ["insert-from-staging", "staging-optimized"]
):
return True
return False

def _create_table_update(
self, table_name: str, storage_columns: TTableSchemaColumns
) -> Sequence[TColumnSchema]:
updates = super()._create_table_update(table_name, storage_columns)
table = self.schema.get_table(table_name)
if "write_disposition" in table and table["write_disposition"] == "replicate":
# operation and sequence columns should only be present in staging table
# not in final table
if not self.in_staging_mode:
op_col = table["cdc_config"]["operation_column"]
seq_col = table["cdc_config"]["sequence_column"]
updates = [d for d in updates if d["name"] not in (op_col, seq_col)]
return updates
Loading
Loading