Skip to content

Commit

Permalink
fixes schema merge behavior (#621)
Browse files Browse the repository at this point in the history
* adds case insensitive naming convention to weaviate, handles property conflicts + tests

* fixes wrong prop name in aws credentials docs

* defines and tests merging behavior for tables and columns, removes adding default hints

* pass the hints to database engine when adding columns

* fixes tests

* updates weaviate doc

* review fixes
  • Loading branch information
rudolfix authored Sep 12, 2023
1 parent 8b18e63 commit 82d8b9d
Show file tree
Hide file tree
Showing 40 changed files with 1,140 additions and 259 deletions.
2 changes: 1 addition & 1 deletion dlt/common/schema/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from dlt.common.schema.typing import TSchemaUpdate, TSchemaTables, TTableSchema, TStoredSchema, TTableSchemaColumns, TColumnHint, TColumnSchema, TColumnSchemaBase # noqa: F401
from dlt.common.schema.typing import COLUMN_HINTS # noqa: F401
from dlt.common.schema.schema import Schema # noqa: F401
from dlt.common.schema.utils import add_missing_hints, verify_schema_hash # noqa: F401
from dlt.common.schema.utils import verify_schema_hash # noqa: F401
45 changes: 37 additions & 8 deletions dlt/common/schema/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def from_dict(cls, d: DictStrAny) -> "Schema":
# verify schema
utils.validate_stored_schema(stored_schema)
# add defaults
utils.apply_defaults(stored_schema)
stored_schema = utils.apply_defaults(stored_schema)

# bump version if modified
utils.bump_version_if_modified(stored_schema)
Expand Down Expand Up @@ -152,6 +152,16 @@ def _exclude(path: str, excludes: Sequence[REPattern], includes: Sequence[REPatt
return row

def coerce_row(self, table_name: str, parent_table: str, row: StrAny) -> Tuple[DictStrAny, TPartialTableSchema]:
"""Fits values of fields present in `row` into a schema of `table_name`. Will coerce values into data types and infer new tables and column schemas.
Method expects that field names in row are already normalized.
* if table schema for `table_name` does not exist, new table is created
* if column schema for a field in `row` does not exist, it is inferred from data
* if incomplete column schema (no data type) exists, column is inferred from data and existing hints are applied
* fields with None value are removed
Returns tuple with row with coerced values and a partial table containing just the newly added columns or None if no changes were detected
"""
# get existing or create a new table
updated_table_partial: TPartialTableSchema = None
table = self._schema_tables.get(table_name)
Expand Down Expand Up @@ -216,7 +226,7 @@ def filter_row_with_hint(self, table_name: str, hint_type: TColumnHint, row: Str
for column_name in table:
if column_name in row:
hint_value = table[column_name][column_prop]
if (hint_value and column_prop != "nullable") or (column_prop == "nullable" and not hint_value):
if not utils.has_default_column_hint_value(column_prop, hint_value):
rv_row[column_name] = row[column_name]
except KeyError:
for k, v in row.items():
Expand All @@ -241,17 +251,32 @@ def merge_hints(self, new_hints: Mapping[TColumnHint, Sequence[TSimpleRegex]]) -
self._compile_settings()

def normalize_table_identifiers(self, table: TTableSchema) -> TTableSchema:
"""Normalizes all table and column names in `table` schema according to current schema naming convention and returns
new normalized TTableSchema instance.
Naming convention like snake_case may produce name clashes with the column names. Clashing column schemas are merged
where the column that is defined later in the dictionary overrides earlier column.
Note that resource name is not normalized.
"""
# normalize all identifiers in table according to name normalizer of the schema
table["name"] = self.naming.normalize_tables_path(table["name"])
parent = table.get("parent")
if parent:
table["parent"] = self.naming.normalize_tables_path(parent)
columns = table.get("columns")
if columns:
new_columns: TTableSchemaColumns = {}
for c in columns.values():
c["name"] = self.naming.normalize_path(c["name"])
# re-index columns as the name changed
table["columns"] = {c["name"]:c for c in columns.values()}
new_col_name = c["name"] = self.naming.normalize_path(c["name"])
# re-index columns as the name changed, if name space was reduced then
# some columns now clash with each other. so make sure that we merge columns that are already there
if new_col_name in new_columns:
new_columns[new_col_name] = utils.merge_columns(new_columns[new_col_name], c, merge_defaults=False)
else:
new_columns[new_col_name] = c
table["columns"] = new_columns
return table

def get_new_table_columns(self, table_name: str, exiting_columns: TTableSchemaColumns, include_incomplete: bool = False) -> List[TColumnSchema]:
Expand Down Expand Up @@ -358,7 +383,10 @@ def _infer_column(self, k: str, v: Any, data_type: TDataType = None, is_variant:
nullable=not self._infer_hint("not_null", v, k)
)
for hint in COLUMN_HINTS:
column_schema[utils.hint_to_column_prop(hint)] = self._infer_hint(hint, v, k)
column_prop = utils.hint_to_column_prop(hint)
hint_value = self._infer_hint(hint, v, k)
if not utils.has_default_column_hint_value(column_prop, hint_value):
column_schema[column_prop] = hint_value

if is_variant:
column_schema["variant"] = is_variant
Expand Down Expand Up @@ -407,9 +435,10 @@ def _coerce_non_null_value(self, table_columns: TTableSchemaColumns, table_name:

if not existing_column:
inferred_column = self._infer_column(col_name, v, data_type=col_type, is_variant=is_variant)
# if there's partial new_column then merge it with inferred column
# if there's incomplete new_column then merge it with inferred column
if new_column:
new_column = utils.merge_columns(new_column, inferred_column, merge_defaults=True)
# use all values present in incomplete column to override inferred column - also the defaults
new_column = utils.merge_columns(inferred_column, new_column)
else:
new_column = inferred_column

Expand Down
5 changes: 4 additions & 1 deletion dlt/common/schema/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@
LOADS_TABLE_NAME = "_dlt_loads"
STATE_TABLE_NAME = "_dlt_pipeline_state"

TColumnHint = Literal["not_null", "partition", "cluster", "primary_key", "foreign_key", "sort", "unique", "root_key", "merge_key"]
TColumnProp = Literal["name", "data_type", "nullable", "partition", "cluster", "primary_key", "foreign_key", "sort", "unique", "merge_key", "root_key"]
"""Known properties and hints of the column"""
# TODO: merge TColumnHint with TColumnProp
TColumnHint = Literal["not_null", "partition", "cluster", "primary_key", "foreign_key", "sort", "unique", "root_key", "merge_key"]
"""Known hints of a column used to declare hint regexes."""
TWriteDisposition = Literal["skip", "append", "replace", "merge"]
TTypeDetections = Literal["timestamp", "iso_timestamp", "large_integer", "hexbytes_to_text", "wei_to_double"]
TTypeDetectionFunc = Callable[[Type[Any], Any], Optional[TDataType]]
Expand Down
Loading

0 comments on commit 82d8b9d

Please sign in to comment.