From 82d8b9df4e077a6eb0e5010cf7e8a607ebe780f7 Mon Sep 17 00:00:00 2001 From: rudolfix Date: Tue, 12 Sep 2023 12:35:53 +0200 Subject: [PATCH] fixes schema merge behavior (#621) * adds case insensitive naming convention to weaviate, handles property conflicts + tests * fixes wrong prop name in aws credentials docs * defines and tests merging behavior for tables and columns, removes adding default hints * pass the hints to database engine when adding columns * fixes tests * updates weaviate doc * review fixes --- dlt/common/schema/__init__.py | 2 +- dlt/common/schema/schema.py | 45 ++- dlt/common/schema/typing.py | 5 +- dlt/common/schema/utils.py | 244 ++++++++------ dlt/destinations/bigquery/bigquery.py | 2 +- dlt/destinations/duckdb/duck.py | 2 +- dlt/destinations/job_client_impl.py | 10 +- dlt/destinations/postgres/postgres.py | 2 +- dlt/destinations/redshift/redshift.py | 2 +- dlt/destinations/snowflake/snowflake.py | 2 +- dlt/destinations/weaviate/ci_naming.py | 6 + dlt/destinations/weaviate/exceptions.py | 11 +- dlt/destinations/weaviate/naming.py | 15 +- dlt/destinations/weaviate/weaviate_client.py | 47 +-- dlt/extract/extract.py | 6 +- dlt/extract/schema.py | 78 +++-- dlt/extract/source.py | 9 +- dlt/extract/utils.py | 7 +- dlt/pipeline/pipeline.py | 10 +- .../docs/dlt-ecosystem/destinations/athena.md | 6 +- .../dlt-ecosystem/destinations/filesystem.md | 2 +- .../dlt-ecosystem/destinations/weaviate.md | 24 +- .../cases/schemas/eth/ethereum_schema_v6.yml | 2 +- tests/common/schema/test_inference.py | 30 +- tests/common/schema/test_merges.py | 301 ++++++++++++++++++ tests/common/schema/test_schema.py | 49 ++- tests/common/utils.py | 6 +- tests/conftest.py | 4 + tests/extract/test_decorators.py | 123 ++++++- tests/extract/test_sources.py | 2 +- .../bigquery/test_bigquery_table_builder.py | 19 -- tests/load/filesystem/test_aws_credentials.py | 22 +- tests/load/pipeline/test_restore_state.py | 6 +- .../redshift/test_redshift_table_builder.py | 9 - .../snowflake/test_snowflake_table_builder.py | 10 - tests/load/weaviate/test_naming.py | 28 +- tests/load/weaviate/test_pipeline.py | 53 ++- tests/load/weaviate/test_weaviate_client.py | 156 ++++++++- tests/load/weaviate/utils.py | 2 +- tests/pipeline/test_pipeline.py | 40 ++- 40 files changed, 1140 insertions(+), 259 deletions(-) create mode 100644 dlt/destinations/weaviate/ci_naming.py create mode 100644 tests/common/schema/test_merges.py diff --git a/dlt/common/schema/__init__.py b/dlt/common/schema/__init__.py index a574d9baf3..6db3f21cef 100644 --- a/dlt/common/schema/__init__.py +++ b/dlt/common/schema/__init__.py @@ -1,4 +1,4 @@ from dlt.common.schema.typing import TSchemaUpdate, TSchemaTables, TTableSchema, TStoredSchema, TTableSchemaColumns, TColumnHint, TColumnSchema, TColumnSchemaBase # noqa: F401 from dlt.common.schema.typing import COLUMN_HINTS # noqa: F401 from dlt.common.schema.schema import Schema # noqa: F401 -from dlt.common.schema.utils import add_missing_hints, verify_schema_hash # noqa: F401 +from dlt.common.schema.utils import verify_schema_hash # noqa: F401 diff --git a/dlt/common/schema/schema.py b/dlt/common/schema/schema.py index af0ecc7673..2889d776c5 100644 --- a/dlt/common/schema/schema.py +++ b/dlt/common/schema/schema.py @@ -66,7 +66,7 @@ def from_dict(cls, d: DictStrAny) -> "Schema": # verify schema utils.validate_stored_schema(stored_schema) # add defaults - utils.apply_defaults(stored_schema) + stored_schema = utils.apply_defaults(stored_schema) # bump version if modified utils.bump_version_if_modified(stored_schema) @@ -152,6 +152,16 @@ def _exclude(path: str, excludes: Sequence[REPattern], includes: Sequence[REPatt return row def coerce_row(self, table_name: str, parent_table: str, row: StrAny) -> Tuple[DictStrAny, TPartialTableSchema]: + """Fits values of fields present in `row` into a schema of `table_name`. Will coerce values into data types and infer new tables and column schemas. + + Method expects that field names in row are already normalized. + * if table schema for `table_name` does not exist, new table is created + * if column schema for a field in `row` does not exist, it is inferred from data + * if incomplete column schema (no data type) exists, column is inferred from data and existing hints are applied + * fields with None value are removed + + Returns tuple with row with coerced values and a partial table containing just the newly added columns or None if no changes were detected + """ # get existing or create a new table updated_table_partial: TPartialTableSchema = None table = self._schema_tables.get(table_name) @@ -216,7 +226,7 @@ def filter_row_with_hint(self, table_name: str, hint_type: TColumnHint, row: Str for column_name in table: if column_name in row: hint_value = table[column_name][column_prop] - if (hint_value and column_prop != "nullable") or (column_prop == "nullable" and not hint_value): + if not utils.has_default_column_hint_value(column_prop, hint_value): rv_row[column_name] = row[column_name] except KeyError: for k, v in row.items(): @@ -241,6 +251,15 @@ def merge_hints(self, new_hints: Mapping[TColumnHint, Sequence[TSimpleRegex]]) - self._compile_settings() def normalize_table_identifiers(self, table: TTableSchema) -> TTableSchema: + """Normalizes all table and column names in `table` schema according to current schema naming convention and returns + new normalized TTableSchema instance. + + Naming convention like snake_case may produce name clashes with the column names. Clashing column schemas are merged + where the column that is defined later in the dictionary overrides earlier column. + + Note that resource name is not normalized. + + """ # normalize all identifiers in table according to name normalizer of the schema table["name"] = self.naming.normalize_tables_path(table["name"]) parent = table.get("parent") @@ -248,10 +267,16 @@ def normalize_table_identifiers(self, table: TTableSchema) -> TTableSchema: table["parent"] = self.naming.normalize_tables_path(parent) columns = table.get("columns") if columns: + new_columns: TTableSchemaColumns = {} for c in columns.values(): - c["name"] = self.naming.normalize_path(c["name"]) - # re-index columns as the name changed - table["columns"] = {c["name"]:c for c in columns.values()} + new_col_name = c["name"] = self.naming.normalize_path(c["name"]) + # re-index columns as the name changed, if name space was reduced then + # some columns now clash with each other. so make sure that we merge columns that are already there + if new_col_name in new_columns: + new_columns[new_col_name] = utils.merge_columns(new_columns[new_col_name], c, merge_defaults=False) + else: + new_columns[new_col_name] = c + table["columns"] = new_columns return table def get_new_table_columns(self, table_name: str, exiting_columns: TTableSchemaColumns, include_incomplete: bool = False) -> List[TColumnSchema]: @@ -358,7 +383,10 @@ def _infer_column(self, k: str, v: Any, data_type: TDataType = None, is_variant: nullable=not self._infer_hint("not_null", v, k) ) for hint in COLUMN_HINTS: - column_schema[utils.hint_to_column_prop(hint)] = self._infer_hint(hint, v, k) + column_prop = utils.hint_to_column_prop(hint) + hint_value = self._infer_hint(hint, v, k) + if not utils.has_default_column_hint_value(column_prop, hint_value): + column_schema[column_prop] = hint_value if is_variant: column_schema["variant"] = is_variant @@ -407,9 +435,10 @@ def _coerce_non_null_value(self, table_columns: TTableSchemaColumns, table_name: if not existing_column: inferred_column = self._infer_column(col_name, v, data_type=col_type, is_variant=is_variant) - # if there's partial new_column then merge it with inferred column + # if there's incomplete new_column then merge it with inferred column if new_column: - new_column = utils.merge_columns(new_column, inferred_column, merge_defaults=True) + # use all values present in incomplete column to override inferred column - also the defaults + new_column = utils.merge_columns(inferred_column, new_column) else: new_column = inferred_column diff --git a/dlt/common/schema/typing.py b/dlt/common/schema/typing.py index 4a8fa858b3..d10091998a 100644 --- a/dlt/common/schema/typing.py +++ b/dlt/common/schema/typing.py @@ -18,8 +18,11 @@ LOADS_TABLE_NAME = "_dlt_loads" STATE_TABLE_NAME = "_dlt_pipeline_state" -TColumnHint = Literal["not_null", "partition", "cluster", "primary_key", "foreign_key", "sort", "unique", "root_key", "merge_key"] TColumnProp = Literal["name", "data_type", "nullable", "partition", "cluster", "primary_key", "foreign_key", "sort", "unique", "merge_key", "root_key"] +"""Known properties and hints of the column""" +# TODO: merge TColumnHint with TColumnProp +TColumnHint = Literal["not_null", "partition", "cluster", "primary_key", "foreign_key", "sort", "unique", "root_key", "merge_key"] +"""Known hints of a column used to declare hint regexes.""" TWriteDisposition = Literal["skip", "append", "replace", "merge"] TTypeDetections = Literal["timestamp", "iso_timestamp", "large_integer", "hexbytes_to_text", "wei_to_double"] TTypeDetectionFunc = Callable[[Type[Any], Any], Optional[TDataType]] diff --git a/dlt/common/schema/utils.py b/dlt/common/schema/utils.py index 94efa975e8..16029ae985 100644 --- a/dlt/common/schema/utils.py +++ b/dlt/common/schema/utils.py @@ -2,7 +2,7 @@ import base64 import hashlib -from copy import deepcopy +from copy import deepcopy, copy from typing import Dict, List, Sequence, Tuple, Type, Any, cast, Iterable, Optional from dlt.common import json @@ -14,7 +14,7 @@ from dlt.common.typing import DictStrAny, REPattern, is_dict_generic_type from dlt.common.validation import TCustomValidator, validate_dict, validate_dict_ignoring_xkeys from dlt.common.schema import detections -from dlt.common.schema.typing import (SCHEMA_ENGINE_VERSION, LOADS_TABLE_NAME, SIMPLE_REGEX_PREFIX, VERSION_TABLE_NAME, TColumnName, TPartialTableSchema, TSchemaTables, TSchemaUpdate, +from dlt.common.schema.typing import (COLUMN_HINTS, SCHEMA_ENGINE_VERSION, LOADS_TABLE_NAME, SIMPLE_REGEX_PREFIX, VERSION_TABLE_NAME, TColumnName, TPartialTableSchema, TSchemaTables, TSchemaUpdate, TSimpleRegex, TStoredSchema, TTableSchema, TTableSchemaColumns, TColumnSchemaBase, TColumnSchema, TColumnProp, TColumnHint, TTypeDetectionFunc, TTypeDetections, TWriteDisposition) from dlt.common.schema.exceptions import (CannotCoerceColumnException, ParentTableNotFoundException, SchemaEngineNoUpgradePathException, SchemaException, @@ -39,7 +39,11 @@ def normalize_schema_name(name: str) -> str: return snake_case.normalize_identifier(name) -def apply_defaults(stored_schema: TStoredSchema) -> None: +def apply_defaults(stored_schema: TStoredSchema) -> TStoredSchema: + """Applies default hint values to `stored_schema` in place + + Updates only complete column hints, incomplete columns are preserved intact + """ for table_name, table in stored_schema["tables"].items(): # overwrite name table["name"] = table_name @@ -49,34 +53,87 @@ def apply_defaults(stored_schema: TStoredSchema) -> None: table["write_disposition"] = DEFAULT_WRITE_DISPOSITION if table.get('resource') is None: table['resource'] = table_name - # add missing hints to columns for column_name in table["columns"]: # add default hints to tables - column = add_missing_hints(table["columns"][column_name]) + column = table["columns"][column_name] # overwrite column name column["name"] = column_name # set column with default - table["columns"][column_name] = column + # table["columns"][column_name] = column + return stored_schema def remove_defaults(stored_schema: TStoredSchema) -> TStoredSchema: + """Removes default values from `stored_schema` in place, returns the input for chaining + + Default values are removed from table schemas and complete column schemas. Incomplete columns are preserved intact. + """ clean_tables = deepcopy(stored_schema["tables"]) for table_name, t in clean_tables.items(): del t["name"] if t.get('resource') == table_name: del t['resource'] for c in t["columns"].values(): + # remove defaults only on complete columns + # if is_complete_column(c): + # remove_column_defaults(c) + # # restore "nullable" True because we want to have explicit nullability in stored schema + # c["nullable"] = c.get("nullable", True) # do not save names del c["name"] - # remove hints with default values - for h in list(c.keys()): - if isinstance(c[h], bool) and c[h] is False and h != "nullable": # type: ignore - del c[h] # type: ignore stored_schema["tables"] = clean_tables return stored_schema +def has_default_column_hint_value(hint: str, value: Any) -> bool: + """Checks if `value` is a default for `hint`. Only known column hints (COLUMN_HINTS) are checked""" + # remove all boolean hints that are False, except "nullable" which is removed when it is True + if hint in COLUMN_HINTS and value is False: + return True + if hint == "nullable" and value is True: + return True + return False + + +def remove_column_defaults(column_schema: TColumnSchema) -> TColumnSchema: + """Removes default values from `column_schema` in place, returns the input for chaining""" + # remove hints with default values + for h in list(column_schema.keys()): + if has_default_column_hint_value(h, column_schema[h]): # type: ignore + del column_schema[h] # type: ignore + elif column_schema[h] is None: # type: ignore + del column_schema[h] # type: ignore + return column_schema + + +def add_column_defaults(column: TColumnSchemaBase) -> TColumnSchema: + """Adds default boolean hints to column""" + return { + **{ # type:ignore + "nullable": True, + "partition": False, + "cluster": False, + "unique": False, + "sort": False, + "primary_key": False, + "foreign_key": False, + "root_key": False, + "merge_key": False + }, + **column + } + +# def add_complete_column_defaults(column: TColumnSchemaBase) -> TColumnSchema: +# """Adds default hints to `column` if it is completed, otherwise preserves `column` content intact + +# Always returns a shallow copy of `column` +# """ +# if is_complete_column(column): +# return add_column_defaults(column) +# return copy(column) # type: ignore + + def bump_version_if_modified(stored_schema: TStoredSchema) -> Tuple[int, str]: # if any change to schema document is detected then bump version and write new hash hash_ = generate_version_hash(stored_schema) @@ -291,24 +348,6 @@ def migrate_filters(group: str, filters: List[str]) -> None: return cast(TStoredSchema, schema_dict) -def add_missing_hints(column: TColumnSchemaBase) -> TColumnSchema: - # return # dict(column) # type: ignore - return { - **{ # type:ignore - "nullable": True, - "partition": False, - "cluster": False, - "unique": False, - "sort": False, - "primary_key": False, - "foreign_key": False, - "root_key": False, - "merge_key": False - }, - **column - } - - def autodetect_sc_type(detection_fs: Sequence[TTypeDetections], t: Type[Any], v: Any) -> TDataType: if detection_fs: for detection_fn in detection_fs: @@ -320,7 +359,7 @@ def autodetect_sc_type(detection_fs: Sequence[TTypeDetections], t: Type[Any], v: return None -def is_complete_column(col: TColumnSchema) -> bool: +def is_complete_column(col: TColumnSchemaBase) -> bool: """Returns true if column contains enough data to be created at the destination. Must contain a name and a data type. Other hints have defaults.""" return bool(col.get("name")) and bool(col.get("data_type")) @@ -332,23 +371,29 @@ def compare_complete_columns(a: TColumnSchema, b: TColumnSchema) -> bool: return a["data_type"] == b["data_type"] and a["name"] == b["name"] -def merge_columns(col_a: TColumnSchema, col_b: TColumnSchema, merge_defaults: bool = False) -> TColumnSchema: - """Merges `col_b` into `col_a`. if `merge_defaults` is True, only hints not present in `col_a` will be set.""" - # print(f"MERGE ({merge_defaults}) {col_b} into {col_a}") - for n, v in col_b.items(): - if col_a.get(n) is None or not merge_defaults: - col_a[n] = v # type: ignore +def merge_columns(col_a: TColumnSchema, col_b: TColumnSchema, merge_defaults: bool = True) -> TColumnSchema: + """Merges `col_b` into `col_a`. if `merge_defaults` is True, only hints from `col_b` that are not default in `col_a` will be set. + + Modifies col_a in place and returns it + """ + col_b_clean = col_b if merge_defaults else remove_column_defaults(copy(col_b)) + for n, v in col_b_clean.items(): + col_a[n] = v # type: ignore + return col_a -def diff_tables(tab_a: TTableSchema, tab_b: TPartialTableSchema, ignore_table_name: bool = True) -> TPartialTableSchema: - """Creates a partial table that contains properties found in `tab_b` that are not present in `tab_a` or that can be updated. +def diff_tables(tab_a: TTableSchema, tab_b: TPartialTableSchema) -> TPartialTableSchema: + """Creates a partial table that contains properties found in `tab_b` that are not present or different in `tab_a`. + The name is always present in returned partial. + It returns new columns (not present in tab_a) and merges columns from tab_b into tab_a (overriding non-default hint values). + If any columns are returned they contain full data (not diffs of columns) + Raises SchemaException if tables cannot be merged + * when columns with the same name have different data types + * when table links to different parent tables """ table_name = tab_a["name"] - if not ignore_table_name and table_name != tab_b["name"]: - raise TablePropertiesConflictException(table_name, "name", table_name, tab_b["name"]) - # check if table properties can be merged if tab_a.get("parent") != tab_b.get("parent"): raise TablePropertiesConflictException(table_name, "parent", tab_a.get("parent"), tab_b.get("parent")) @@ -359,49 +404,68 @@ def diff_tables(tab_a: TTableSchema, tab_b: TPartialTableSchema, ignore_table_na for col_b_name, col_b in tab_b["columns"].items(): if col_b_name in tab_a_columns: col_a = tab_a_columns[col_b_name] - # we do not support changing existing columns + # we do not support changing data types of columns if is_complete_column(col_a) and is_complete_column(col_b): if not compare_complete_columns(tab_a_columns[col_b_name], col_b): # attempt to update to incompatible columns raise CannotCoerceColumnException(table_name, col_b_name, col_b["data_type"], tab_a_columns[col_b_name]["data_type"], None) - # else: - new_columns.append(merge_columns(col_a, col_b)) + # all other properties can change + merged_column = merge_columns(copy(col_a), col_b) + if merged_column != col_a: + new_columns.append(merged_column) else: new_columns.append(col_b) - # return partial table containing only name and properties that differ (column, filters etc.) - partial_table = new_table(table_name, columns=new_columns) - partial_table["write_disposition"] = None - # if tab_b.get("write_disposition") - # partial_table["write_disposition"] = tab_b.get("write_disposition") - # partial_table["description"] = tab_b.get("description") - # partial_table["filters"] = deepcopy(tab_b.get("filters")) + partial_table: TPartialTableSchema = { + "name": table_name, + "columns": {} if new_columns is None else {c["name"]: c for c in new_columns} + } + for k, v in tab_b.items(): + if k in ["columns", None]: + continue + existing_v = tab_a.get(k) + if existing_v != v: + print(f"{k} ==? {v} ==? {existing_v}") + partial_table[k] = v # type: ignore + + # this should not really happen + if tab_a.get('parent') is not None and (resource := tab_b.get('resource')): + raise TablePropertiesConflictException(table_name, "resource", resource, tab_a.get('parent')) + return partial_table -def compare_tables(tab_a: TTableSchema, tab_b: TTableSchema) -> bool: - try: - diff_table = diff_tables(tab_a, tab_b, ignore_table_name=False) - # columns cannot differ - return len(diff_table["columns"]) == 0 - except SchemaException: - return False +# def compare_tables(tab_a: TTableSchema, tab_b: TTableSchema) -> bool: +# try: +# table_name = tab_a["name"] +# if table_name != tab_b["name"]: +# raise TablePropertiesConflictException(table_name, "name", table_name, tab_b["name"]) +# diff_table = diff_tables(tab_a, tab_b, ignore_table_name=False) +# # columns cannot differ +# return len(diff_table["columns"]) == 0 +# except SchemaException: +# return False def merge_tables(table: TTableSchema, partial_table: TPartialTableSchema) -> TPartialTableSchema: - """Merges "partial_table" into "table", preserving the "table" name. Returns the diff partial table.""" + """Merges "partial_table" into "table". `table` is merged in place. Returns the diff partial table. + + `table` and `partial_table` names must be identical. A table diff is generated and applied to `table`: + * new columns are added, updated columns are replaced from diff + * table hints are added or replaced from diff + * nothing gets deleted + """ - diff_table = diff_tables(table, partial_table, ignore_table_name=True) + if table["name"] != partial_table["name"]: + raise TablePropertiesConflictException(table["name"], "name", table["name"], partial_table["name"]) + diff_table = diff_tables(table, partial_table) # add new columns when all checks passed table["columns"].update(diff_table["columns"]) - - partial_w_d = partial_table.get("write_disposition") - if partial_w_d: - table["write_disposition"] = partial_w_d - if table.get('parent') is None and (resource := partial_table.get('resource')): - table['resource'] = resource + updated_columns = table["columns"] + table.update(diff_table) + table["columns"] = updated_columns return diff_table @@ -489,36 +553,36 @@ def version_table() -> TTableSchema: # NOTE: always add new columns at the end of the table so we have identical layout # after an update of existing tables (always at the end) table = new_table(VERSION_TABLE_NAME, columns=[ - add_missing_hints({ + { "name": "version", "data_type": "bigint", "nullable": False, - }), - add_missing_hints({ + }, + { "name": "engine_version", "data_type": "bigint", "nullable": False - }), - add_missing_hints({ + }, + { "name": "inserted_at", "data_type": "timestamp", "nullable": False - }), - add_missing_hints({ + }, + { "name": "schema_name", "data_type": "text", "nullable": False - }), - add_missing_hints({ + }, + { "name": "version_hash", "data_type": "text", "nullable": False - }), - add_missing_hints({ + }, + { "name": "schema", "data_type": "text", "nullable": False - }) + } ] ) table["write_disposition"] = "skip" @@ -530,31 +594,31 @@ def load_table() -> TTableSchema: # NOTE: always add new columns at the end of the table so we have identical layout # after an update of existing tables (always at the end) table = new_table(LOADS_TABLE_NAME, columns=[ - add_missing_hints({ + { "name": "load_id", "data_type": "text", "nullable": False - }), - add_missing_hints({ + }, + { "name": "schema_name", "data_type": "text", "nullable": True - }), - add_missing_hints({ + }, + { "name": "status", "data_type": "bigint", "nullable": False - }), - add_missing_hints({ + }, + { "name": "inserted_at", "data_type": "timestamp", "nullable": False - }), - add_missing_hints({ + }, + { "name": "schema_version_hash", "data_type": "text", "nullable": True, - }), + }, ] ) table["write_disposition"] = "skip" @@ -573,7 +637,7 @@ def new_table( table: TTableSchema = { "name": table_name, - "columns": {} if columns is None else {c["name"]: add_missing_hints(c) for c in columns} + "columns": {} if columns is None else {c["name"]: c for c in columns} } if parent_table_name: table["parent"] = parent_table_name @@ -594,10 +658,10 @@ def new_table( def new_column(column_name: str, data_type: TDataType = None, nullable: bool = True, validate_schema: bool = False) -> TColumnSchema: - column = add_missing_hints({ + column: TColumnSchema = { "name": column_name, "nullable": nullable - }) + } if data_type: column["data_type"] = data_type if validate_schema: diff --git a/dlt/destinations/bigquery/bigquery.py b/dlt/destinations/bigquery/bigquery.py index c4a3dd6004..ffbc1b546b 100644 --- a/dlt/destinations/bigquery/bigquery.py +++ b/dlt/destinations/bigquery/bigquery.py @@ -229,7 +229,7 @@ def _get_table_update_sql(self, table_name: str, new_columns: Sequence[TColumnSc def _get_column_def_sql(self, c: TColumnSchema) -> str: name = self.capabilities.escape_identifier(c["name"]) - return f"{name} {self._to_db_type(c['data_type'])} {self._gen_not_null(c['nullable'])}" + return f"{name} {self._to_db_type(c['data_type'])} {self._gen_not_null(c.get('nullable', True))}" def get_storage_table(self, table_name: str) -> Tuple[bool, TTableSchemaColumns]: schema_table: TTableSchemaColumns = {} diff --git a/dlt/destinations/duckdb/duck.py b/dlt/destinations/duckdb/duck.py index b36164dc80..0a9fe1fb3b 100644 --- a/dlt/destinations/duckdb/duck.py +++ b/dlt/destinations/duckdb/duck.py @@ -105,7 +105,7 @@ def start_file_load(self, table: TTableSchema, file_path: str, load_id: str) -> def _get_column_def_sql(self, c: TColumnSchema) -> str: hints_str = " ".join(self.active_hints.get(h, "") for h in self.active_hints.keys() if c.get(h, False) is True) column_name = self.capabilities.escape_identifier(c["name"]) - return f"{column_name} {self._to_db_type(c['data_type'])} {hints_str} {self._gen_not_null(c['nullable'])}" + return f"{column_name} {self._to_db_type(c['data_type'])} {hints_str} {self._gen_not_null(c.get('nullable', True))}" @classmethod def _to_db_type(cls, sc_t: TDataType) -> str: diff --git a/dlt/destinations/job_client_impl.py b/dlt/destinations/job_client_impl.py index 4b691b5e77..e09aebc64c 100644 --- a/dlt/destinations/job_client_impl.py +++ b/dlt/destinations/job_client_impl.py @@ -13,7 +13,6 @@ from dlt.common import json, pendulum, logger from dlt.common.data_types import TDataType from dlt.common.schema.typing import COLUMN_HINTS, TColumnSchemaBase, TTableSchema, TWriteDisposition -from dlt.common.schema.utils import add_missing_hints from dlt.common.storages import FileStorage from dlt.common.schema import TColumnSchema, Schema, TTableSchemaColumns, TSchemaTables from dlt.common.destination.reference import StateInfo, StorageSchemaInfo,WithStateSync, DestinationClientConfiguration, DestinationClientDwhConfiguration, DestinationClientDwhWithStagingConfiguration, NewLoadJob, WithStagingDataset, TLoadJobState, LoadJob, JobClientBase, FollowupJob, CredentialsConfiguration @@ -242,7 +241,7 @@ def _null_to_bool(v: str) -> bool: "nullable": _null_to_bool(c[2]), "data_type": self._from_db_type(c[1], numeric_precision, numeric_scale), } - schema_table[c[0]] = add_missing_hints(schema_c) + schema_table[c[0]] = schema_c # type: ignore return True, schema_table @classmethod @@ -355,7 +354,12 @@ def _get_table_update_sql(self, table_name: str, new_columns: Sequence[TColumnSc for hint in COLUMN_HINTS: if any(c.get(hint, False) is True for c in new_columns): hint_columns = [self.capabilities.escape_identifier(c["name"]) for c in new_columns if c.get(hint, False)] - raise DestinationSchemaWillNotUpdate(canonical_name, hint_columns, f"{hint} requested after table was created") + if hint == "not_null": + logger.warning(f"Column(s) {hint_columns} with NOT NULL are being added to existing table {canonical_name}." + " If there's data in the table the operation will fail.") + else: + logger.warning(f"Column(s) {hint_columns} with hint {hint} are being added to existing table {canonical_name}." + " Several hint types may not be added to existing tables.") return sql_result @abstractmethod diff --git a/dlt/destinations/postgres/postgres.py b/dlt/destinations/postgres/postgres.py index a289768002..6e7e049bde 100644 --- a/dlt/destinations/postgres/postgres.py +++ b/dlt/destinations/postgres/postgres.py @@ -81,7 +81,7 @@ def __init__(self, schema: Schema, config: PostgresClientConfiguration) -> None: def _get_column_def_sql(self, c: TColumnSchema) -> str: hints_str = " ".join(self.active_hints.get(h, "") for h in self.active_hints.keys() if c.get(h, False) is True) column_name = self.capabilities.escape_identifier(c["name"]) - return f"{column_name} {self._to_db_type(c['data_type'])} {hints_str} {self._gen_not_null(c['nullable'])}" + return f"{column_name} {self._to_db_type(c['data_type'])} {hints_str} {self._gen_not_null(c.get('nullable', True))}" def _create_optimized_replace_job(self, table_chain: Sequence[TTableSchema]) -> NewLoadJob: return PostgresStagingCopyJob.from_table_chain(table_chain, self.sql_client) diff --git a/dlt/destinations/redshift/redshift.py b/dlt/destinations/redshift/redshift.py index 70140ec3fd..2604f52c53 100644 --- a/dlt/destinations/redshift/redshift.py +++ b/dlt/destinations/redshift/redshift.py @@ -175,7 +175,7 @@ def _create_merge_job(self, table_chain: Sequence[TTableSchema]) -> NewLoadJob: def _get_column_def_sql(self, c: TColumnSchema) -> str: hints_str = " ".join(HINT_TO_REDSHIFT_ATTR.get(h, "") for h in HINT_TO_REDSHIFT_ATTR.keys() if c.get(h, False) is True) column_name = self.capabilities.escape_identifier(c["name"]) - return f"{column_name} {self._to_db_type(c['data_type'])} {hints_str} {self._gen_not_null(c['nullable'])}" + return f"{column_name} {self._to_db_type(c['data_type'])} {hints_str} {self._gen_not_null(c.get('nullable', True))}" def start_file_load(self, table: TTableSchema, file_path: str, load_id: str) -> LoadJob: """Starts SqlLoadJob for files ending with .sql or returns None to let derived classes to handle their specific jobs""" diff --git a/dlt/destinations/snowflake/snowflake.py b/dlt/destinations/snowflake/snowflake.py index f5b7aea519..426a1e53a1 100644 --- a/dlt/destinations/snowflake/snowflake.py +++ b/dlt/destinations/snowflake/snowflake.py @@ -222,7 +222,7 @@ def _from_db_type(cls, bq_t: str, precision: Optional[int], scale: Optional[int] def _get_column_def_sql(self, c: TColumnSchema) -> str: name = self.capabilities.escape_identifier(c["name"]) - return f"{name} {self._to_db_type(c['data_type'])} {self._gen_not_null(c['nullable'])}" + return f"{name} {self._to_db_type(c['data_type'])} {self._gen_not_null(c.get('nullable', True))}" def get_storage_table(self, table_name: str) -> Tuple[bool, TTableSchemaColumns]: table_name = table_name.upper() # All snowflake tables are uppercased in information schema diff --git a/dlt/destinations/weaviate/ci_naming.py b/dlt/destinations/weaviate/ci_naming.py new file mode 100644 index 0000000000..3b1c068133 --- /dev/null +++ b/dlt/destinations/weaviate/ci_naming.py @@ -0,0 +1,6 @@ +from .naming import NamingConvention as WeaviateNamingConvention + +class NamingConvention(WeaviateNamingConvention): + def _lowercase_property(self, identifier: str) -> str: + """Lowercase the whole property to become case insensitive""" + return identifier.lower() diff --git a/dlt/destinations/weaviate/exceptions.py b/dlt/destinations/weaviate/exceptions.py index 0177f64c51..adec0fee1e 100644 --- a/dlt/destinations/weaviate/exceptions.py +++ b/dlt/destinations/weaviate/exceptions.py @@ -1,5 +1,12 @@ -from dlt.common.exceptions import DestinationException +from dlt.common.exceptions import DestinationException, DestinationTerminalException class WeaviateBatchError(DestinationException): - pass \ No newline at end of file + pass + + +class PropertyNameConflict(DestinationTerminalException): + def __init__(self) -> None: + super().__init__("Your data contains items with identical property names when compared case insensitive. Weaviate cannot handle such data." + " Please clean up your data before loading or change to case insensitive naming convention." + " See https://dlthub.com/docs/dlt-ecosystem/destinations/weaviate#names-normalization for details.") diff --git a/dlt/destinations/weaviate/naming.py b/dlt/destinations/weaviate/naming.py index 1c43e0b58a..cf01983b90 100644 --- a/dlt/destinations/weaviate/naming.py +++ b/dlt/destinations/weaviate/naming.py @@ -18,17 +18,24 @@ class NamingConvention(SnakeCaseNamingConvention): _SPLIT_UNDERSCORE_NON_CAP = re.compile("(_[^A-Z])") def normalize_identifier(self, identifier: str) -> str: - """Normalizes Weaviate property name by removing not allowed characters, replacing them by _ and contracting multiple _ into single one""" + """Normalizes Weaviate property name by removing not allowed characters, replacing them by _ and contracting multiple _ into single one + and lowercasing the first character. + + """ identifier = BaseNamingConvention.normalize_identifier(self, identifier) if identifier in self.RESERVED_PROPERTIES: return self.RESERVED_PROPERTIES[identifier] norm_identifier = self._base_normalize(identifier) if self._STARTS_DIGIT.match(norm_identifier): norm_identifier = "p_" + norm_identifier + norm_identifier = self._lowercase_property(norm_identifier) return self.shorten_identifier(norm_identifier, identifier, self.max_length) def normalize_table_identifier(self, identifier: str) -> str: - """Creates Weaviate class name. Runs property normalization and then creates capitalized case name by splitting on _""" + """Creates Weaviate class name. Runs property normalization and then creates capitalized case name by splitting on _ + + https://weaviate.io/developers/weaviate/configuration/schema-configuration#create-a-class + """ identifier = BaseNamingConvention.normalize_identifier(self, identifier) norm_identifier = self._base_normalize(identifier) # norm_identifier = norm_identifier.strip("_") @@ -38,6 +45,10 @@ def normalize_table_identifier(self, identifier: str) -> str: norm_identifier = "C" + norm_identifier return self.shorten_identifier(norm_identifier, identifier, self.max_length) + def _lowercase_property(self, identifier: str) -> str: + # lowercase the first letter to follow Weaviate guidelines on properties + return identifier[0].lower() + identifier[1:] + def _base_normalize(self, identifier: str) -> str: # all characters that are not letters digits or a few special chars are replaced with underscore normalized_ident = identifier.translate(self._TR_REDUCE_ALPHABET) diff --git a/dlt/destinations/weaviate/weaviate_client.py b/dlt/destinations/weaviate/weaviate_client.py index 42493609ae..bbd01f0381 100644 --- a/dlt/destinations/weaviate/weaviate_client.py +++ b/dlt/destinations/weaviate/weaviate_client.py @@ -47,7 +47,7 @@ from dlt.destinations.job_client_impl import StorageSchemaInfo, StateInfo from dlt.destinations.weaviate import capabilities from dlt.destinations.weaviate.configuration import WeaviateClientConfiguration -from dlt.destinations.weaviate.exceptions import WeaviateBatchError +from dlt.destinations.weaviate.exceptions import PropertyNameConflict, WeaviateBatchError SCT_TO_WT: Dict[TDataType, str] = { "text": "text", @@ -95,10 +95,14 @@ def _wrap(self: JobClientBase, *args: Any, **kwargs: Any) -> Any: except weaviate.exceptions.UnexpectedStatusCodeException as status_ex: # special handling for non existing objects/classes if status_ex.status_code == 404: - raise DestinationUndefinedEntity(status_ex) from status_ex - # looks like there are no more terminal exceptions - if status_ex.status_code in (403, 422): + raise DestinationUndefinedEntity(status_ex) + if status_ex.status_code == 403: raise DestinationTerminalException(status_ex) + if status_ex.status_code == 422: + if "conflict for property" in str(status_ex) or "none vectorizer module" in str(status_ex): + raise PropertyNameConflict() + raise DestinationTerminalException(status_ex) + # looks like there are no more terminal exception raise DestinationTransientException(status_ex) except weaviate.exceptions.WeaviateBaseError as we_ex: # also includes 401 as transient @@ -119,8 +123,10 @@ def _wrap(*args: Any, **kwargs: Any) -> Any: # TODO: actually put the job in failed/retry state and prepare exception message with full info on failing item if "invalid" in message and "property" in message and "on class" in message: raise DestinationTerminalException( - f"Batch failed {errors} AND WILL BE RETRIED" + f"Batch failed {errors} AND WILL **NOT** BE RETRIED" ) + if "conflict for property" in message: + raise PropertyNameConflict() raise DestinationTransientException( f"Batch failed {errors} AND WILL BE RETRIED" ) @@ -262,7 +268,7 @@ def create_db_client(config: WeaviateClientConfiguration) -> weaviate.Client: additional_headers=config.credentials.additional_headers, ) - def make_full_name(self, table_name: str) -> str: + def make_qualified_class_name(self, table_name: str) -> str: """Make a full Weaviate class name from a table name by prepending the dataset name if it exists. """ @@ -277,7 +283,7 @@ def make_full_name(self, table_name: str) -> str: def get_class_schema(self, table_name: str) -> Dict[str, Any]: """Get the Weaviate class schema for a table.""" return cast( - Dict[str, Any], self.db_client.schema.get(self.make_full_name(table_name)) + Dict[str, Any], self.db_client.schema.get(self.make_qualified_class_name(table_name)) ) def create_class( @@ -294,7 +300,7 @@ def create_class( updated_schema = class_schema.copy() updated_schema["class"] = ( - self.make_full_name(updated_schema["class"]) + self.make_qualified_class_name(updated_schema["class"]) if full_class_name is None else full_class_name ) @@ -311,7 +317,7 @@ def create_class_property( prop_schema: The property schema to create. """ self.db_client.schema.property.create( - self.make_full_name(class_name), prop_schema + self.make_qualified_class_name(class_name), prop_schema ) def delete_class(self, class_name: str) -> None: @@ -320,7 +326,7 @@ def delete_class(self, class_name: str) -> None: Args: class_name: The name of the class to delete. """ - self.db_client.schema.delete_class(self.make_full_name(class_name)) + self.db_client.schema.delete_class(self.make_qualified_class_name(class_name)) def delete_all_classes(self) -> None: """Delete all Weaviate classes from Weaviate instance and all data @@ -338,7 +344,7 @@ def query_class(self, class_name: str, properties: List[str]) -> GetBuilder: Returns: A Weaviate query builder. """ - return self.db_client.query.get(self.make_full_name(class_name), properties) + return self.db_client.query.get(self.make_qualified_class_name(class_name), properties) def create_object(self, obj: Dict[str, Any], class_name: str) -> None: """Create a Weaviate object. @@ -347,7 +353,7 @@ def create_object(self, obj: Dict[str, Any], class_name: str) -> None: obj: The object to create. class_name: The name of the class to create the object on. """ - self.db_client.data_object.create(obj, self.make_full_name(class_name)) + self.db_client.data_object.create(obj, self.make_qualified_class_name(class_name)) def drop_storage(self) -> None: """Drop the dataset from Weaviate instance. @@ -372,12 +378,12 @@ def drop_storage(self) -> None: if class_name in class_name_list: self.db_client.schema.delete_class(class_name) - self.delete_sentinel_class() + self._delete_sentinel_class() @wrap_weaviate_error def initialize_storage(self, truncate_tables: Iterable[str] = None) -> None: if not self.is_storage_initialized(): - self.create_sentinel_class() + self._create_sentinel_class() elif truncate_tables: for table_name in truncate_tables: try: @@ -400,11 +406,11 @@ def is_storage_initialized(self) -> bool: raise return True - def create_sentinel_class(self) -> None: + def _create_sentinel_class(self) -> None: """Create an empty class to indicate that the storage is initialized.""" self.create_class(NON_VECTORIZED_CLASS, full_class_name=self.sentinel_class) - def delete_sentinel_class(self) -> None: + def _delete_sentinel_class(self) -> None: """Delete the sentinel class.""" self.db_client.schema.delete_class(self.sentinel_class) @@ -436,6 +442,7 @@ def update_stored_schema( def _execute_schema_update(self, only_tables: Iterable[str]) -> None: for table_name in only_tables or self.schema.tables: exists, existing_columns = self.get_storage_table(table_name) + # TODO: detect columns where vectorization was added or removed and modify it. currently we ignore change of hints new_columns = self.schema.get_new_table_columns( table_name, existing_columns ) @@ -465,7 +472,7 @@ def get_storage_table(self, table_name: str) -> Tuple[bool, TTableSchemaColumns] # Convert Weaviate class schema to dlt table schema for prop in class_schema["properties"]: schema_c: TColumnSchema = { - "name": prop["name"], + "name": self.schema.naming.normalize_identifier(prop["name"]), "data_type": self._from_db_type(prop["dataType"][0]), } table_schema[prop["name"]] = schema_c @@ -562,7 +569,7 @@ def get_records(self, table_name: str, where: Dict[str, Any] = None, sort: Dict[ query = query.with_offset(offset) response = query.do() - full_class_name = self.make_full_name(table_name) + full_class_name = self.make_qualified_class_name(table_name) records = response["data"]["Get"][full_class_name] return cast(List[Dict[str, Any]],records) @@ -626,7 +633,7 @@ def start_file_load( file_path, db_client=self.db_client, client_config=self.config, - class_name=self.make_full_name(table["name"]), + class_name=self.make_qualified_class_name(table["name"]), ) def restore_file_load(self, file_path: str) -> LoadJob: @@ -672,4 +679,4 @@ def _to_db_type(sc_t: TDataType) -> str: @staticmethod def _from_db_type(wt_t: str) -> TDataType: - return WT_TO_SCT[wt_t] + return WT_TO_SCT.get(wt_t, "text") diff --git a/dlt/extract/extract.py b/dlt/extract/extract.py index d84895b308..721c8b3f0a 100644 --- a/dlt/extract/extract.py +++ b/dlt/extract/extract.py @@ -90,11 +90,11 @@ def _write_dynamic_table(resource: DltResource, item: TDataItem) -> None: table_name = resource._table_name_hint_fun(item) existing_table = dynamic_tables.get(table_name) if existing_table is None: - dynamic_tables[table_name] = [resource.table_schema(item)] + dynamic_tables[table_name] = [resource.compute_table_schema(item)] else: # quick check if deep table merge is required if resource._table_has_other_dynamic_hints: - new_table = resource.table_schema(item) + new_table = resource.compute_table_schema(item) # this merges into existing table in place utils.merge_tables(existing_table[0], new_table) else: @@ -106,7 +106,7 @@ def _write_dynamic_table(resource: DltResource, item: TDataItem) -> None: def _write_static_table(resource: DltResource, table_name: str) -> None: existing_table = dynamic_tables.get(table_name) if existing_table is None: - static_table = resource.table_schema() + static_table = resource.compute_table_schema() static_table["name"] = table_name dynamic_tables[table_name] = [static_table] diff --git a/dlt/extract/schema.py b/dlt/extract/schema.py index 709f5c8b0a..3149a37c12 100644 --- a/dlt/extract/schema.py +++ b/dlt/extract/schema.py @@ -5,12 +5,13 @@ from dlt.common.schema.utils import DEFAULT_WRITE_DISPOSITION, merge_columns, new_column, new_table from dlt.common.schema.typing import TColumnNames, TColumnProp, TColumnSchema, TPartialTableSchema, TTableSchemaColumns, TWriteDisposition, TAnySchemaColumns from dlt.common.typing import TDataItem +from dlt.common.utils import update_dict_nested from dlt.common.validation import validate_dict_ignoring_xkeys from dlt.extract.incremental import Incremental from dlt.extract.typing import TFunHintTemplate, TTableHintTemplate from dlt.extract.exceptions import DataItemRequiredForDynamicTableHints, InconsistentTableTemplate, TableNameMissing -from dlt.extract.utils import ensure_table_schema_columns_hint +from dlt.extract.utils import ensure_table_schema_columns, ensure_table_schema_columns_hint class TTableSchemaTemplate(TypedDict, total=False): @@ -46,16 +47,23 @@ def table_name(self, value: TTableHintTemplate[str]) -> None: self.apply_hints(table_name=value) @property - def write_disposition(self) -> TWriteDisposition: + def write_disposition(self) -> TTableHintTemplate[TWriteDisposition]: if self._table_schema_template is None or self._table_schema_template.get("write_disposition") is None: return DEFAULT_WRITE_DISPOSITION - w_d = self._table_schema_template.get("write_disposition") - if callable(w_d): - raise DataItemRequiredForDynamicTableHints(self._name) - else: - return w_d + return self._table_schema_template.get("write_disposition") + + @write_disposition.setter + def write_disposition(self, value: TTableHintTemplate[TWriteDisposition]) -> None: + self.apply_hints(write_disposition=value) + + @property + def columns(self) -> TTableHintTemplate[TTableSchemaColumns]: + """Gets columns schema that can be modified in place""" + if self._table_schema_template is None: + return None + return self._table_schema_template.get("columns") - def table_schema(self, item: TDataItem = None) -> TPartialTableSchema: + def compute_table_schema(self, item: TDataItem = None) -> TPartialTableSchema: """Computes the table schema based on hints and column definitions passed during resource creation. `item` parameter is used to resolve table hints based on data""" if not self._table_schema_template: return new_table(self._name, resource=self._name) @@ -102,8 +110,6 @@ def apply_hints( In non-aware resources, `dlt` will filter out the loaded values, however the resource will yield all the values again. """ t = None - if columns is not None: - columns = ensure_table_schema_columns_hint(columns) if not self._table_schema_template: # if there's no template yet, create and set new one t = self.new_table_template(table_name, parent_table_name, write_disposition, columns, primary_key, merge_key) @@ -114,7 +120,7 @@ def apply_hints( if table_name: t["name"] = table_name else: - t.pop("name", None) + t["name"] = self._name if parent_table_name is not None: if parent_table_name: t["parent"] = parent_table_name @@ -123,15 +129,34 @@ def apply_hints( if write_disposition: t["write_disposition"] = write_disposition if columns is not None: - t["columns"] = columns + # if callable then override existing + if callable(columns) or callable(t["columns"]): + t["columns"] = ensure_table_schema_columns_hint(columns) + elif columns: + # normalize columns + columns = ensure_table_schema_columns(columns) + # this updates all columns with defaults + t["columns"] = update_dict_nested(t["columns"], columns) + else: + t.pop("columns", None) + if primary_key is not None: - t["primary_key"] = primary_key + if primary_key: + t["primary_key"] = primary_key + else: + t.pop("primary_key", None) if merge_key is not None: - t["merge_key"] = merge_key - t["incremental"] = incremental + if merge_key: + t["merge_key"] = merge_key + else: + t.pop("merge_key", None) + + # set properties that cannot be passed to new_table_template + t["incremental"] = incremental self.set_template(t) def set_template(self, table_schema_template: TTableSchemaTemplate) -> None: + DltResourceSchema.validate_dynamic_hints(table_schema_template) # if "name" is callable in the template then the table schema requires actual data item to be inferred name_hint = table_schema_template["name"] if callable(name_hint): @@ -179,28 +204,29 @@ def new_table_template( table_name: TTableHintTemplate[str], parent_table_name: TTableHintTemplate[str] = None, write_disposition: TTableHintTemplate[TWriteDisposition] = None, - columns: TTableHintTemplate[TTableSchemaColumns] = None, + columns: TTableHintTemplate[TAnySchemaColumns] = None, primary_key: TTableHintTemplate[TColumnNames] = None, merge_key: TTableHintTemplate[TColumnNames] = None ) -> TTableSchemaTemplate: if not table_name: raise TableNameMissing() + if columns is not None: + columns = ensure_table_schema_columns_hint(columns) + if not callable(columns): + columns = columns.values() # type: ignore # create a table schema template where hints can be functions taking TDataItem - if isinstance(columns, C_Mapping): - # new_table accepts a sequence - column_list: List[TColumnSchema] = [] - for name, column in columns.items(): - column["name"] = name - column_list.append(column) - columns = column_list # type: ignore - new_template: TTableSchemaTemplate = new_table(table_name, parent_table_name, write_disposition=write_disposition, columns=columns) # type: ignore if primary_key: new_template["primary_key"] = primary_key if merge_key: new_template["merge_key"] = merge_key + DltResourceSchema.validate_dynamic_hints(new_template) + return new_template + + @staticmethod + def validate_dynamic_hints(template: TTableSchemaTemplate) -> None: + table_name = template["name"] # if any of the hints is a function then name must be as well - if any(callable(v) for k, v in new_template.items() if k != "name") and not callable(table_name): + if any(callable(v) for k, v in template.items() if k not in ["name", "incremental"]) and not callable(table_name): raise InconsistentTableTemplate(f"Table name {table_name} must be a function if any other table hint is a function") - return new_template diff --git a/dlt/extract/source.py b/dlt/extract/source.py index 53e54649b0..52a0381dfe 100644 --- a/dlt/extract/source.py +++ b/dlt/extract/source.py @@ -469,7 +469,10 @@ def extracted(self) -> Dict[str, DltResource]: resource = self.find_by_pipe(pipe) except KeyError: # resource for pipe not found: return mock resource - mock_template = DltResourceSchema.new_table_template(pipe.name, write_disposition=resource._table_schema_template.get("write_disposition")) + mock_template = DltResourceSchema.new_table_template( + pipe.name, + write_disposition=resource._table_schema_template.get("write_disposition") + ) resource = DltResource(pipe, mock_template, False, section=resource.section) resource.source_name = resource.source_name extracted[resource._name] = resource @@ -658,7 +661,9 @@ def discover_schema(self, item: TDataItem = None) -> Schema: for r in self.selected_resources.values(): # names must be normalized here with contextlib.suppress(DataItemRequiredForDynamicTableHints): - partial_table = self._schema.normalize_table_identifiers(r.table_schema(item)) + partial_table = self._schema.normalize_table_identifiers( + r.compute_table_schema(item) + ) schema.update_schema(partial_table) return schema diff --git a/dlt/extract/utils.py b/dlt/extract/utils.py index b4c59eb30c..04cb41299f 100644 --- a/dlt/extract/utils.py +++ b/dlt/extract/utils.py @@ -30,10 +30,9 @@ def ensure_table_schema_columns(columns: TAnySchemaColumns) -> TTableSchemaColum columns: A dict of column schemas, a list of column schemas, or a pydantic model """ if isinstance(columns, C_Mapping): - # Assume dict is already in the correct format - # but ensure the name key is set correctly - for name, column in columns.items(): - column["name"] = name + # fill missing names in short form was used + for col_name in columns: + columns[col_name]["name"] = col_name return columns elif isinstance(columns, Sequence): # Assume list of columns diff --git a/dlt/pipeline/pipeline.py b/dlt/pipeline/pipeline.py index 9a0dc831b4..98eb6d408f 100644 --- a/dlt/pipeline/pipeline.py +++ b/dlt/pipeline/pipeline.py @@ -33,10 +33,9 @@ from dlt.common.utils import is_interactive from dlt.common.data_writers import TLoaderFileFormat -from dlt.extract.exceptions import DataItemRequiredForDynamicTableHints, SourceExhausted +from dlt.extract.exceptions import SourceExhausted from dlt.extract.extract import ExtractorStorage, extract_with_schema from dlt.extract.source import DltResource, DltSource -from dlt.extract.utils import ensure_table_schema_columns from dlt.normalize import Normalize from dlt.normalize.configuration import NormalizeConfiguration from dlt.destinations.sql_client import SqlClientBase @@ -792,10 +791,9 @@ def _data_to_sources(self, ) -> List[DltSource]: def apply_hint_args(resource: DltResource) -> None: - columns_dict = ensure_table_schema_columns(columns) if columns is not None else None # apply hints only if any of the hints is present, table_name must be always present if table_name or parent_table_name or write_disposition or columns or primary_key: - resource.apply_hints(table_name or resource.table_name or resource.name, parent_table_name, write_disposition, columns_dict, primary_key) + resource.apply_hints(table_name or resource.table_name or resource.name, parent_table_name, write_disposition, columns, primary_key) def choose_schema() -> Schema: """Except of explicitly passed schema, use a clone that will get discarded if extraction fails""" @@ -876,7 +874,9 @@ def _extract_source(self, storage: ExtractorStorage, source: DltSource, max_para # get the current schema and merge tables from source_schema # note we are not merging props like max nesting or column propagation for table in source_schema.data_tables(include_incomplete=True): - pipeline_schema.update_schema(pipeline_schema.normalize_table_identifiers(table)) + pipeline_schema.update_schema( + pipeline_schema.normalize_table_identifiers(table) + ) return extract_id diff --git a/docs/website/docs/dlt-ecosystem/destinations/athena.md b/docs/website/docs/dlt-ecosystem/destinations/athena.md index b64966c466..275e36736e 100644 --- a/docs/website/docs/dlt-ecosystem/destinations/athena.md +++ b/docs/website/docs/dlt-ecosystem/destinations/athena.md @@ -63,10 +63,10 @@ region_name="please set me up!" # set your aws region, for example "eu-central-1 if you have your credentials stored in `~/.aws/credentials` just remove the **[destination.filesystem.credentials]** and **[destination.athena.credentials]** section above and `dlt` will fall back to your **default** profile in local credentials. If you want to switch the profile, pass the profile name as follows (here: `dlt-ci-user`): ```toml [destination.filesystem.credentials] -aws_profile="dlt-ci-user" +profile_name="dlt-ci-user" [destination.athena.credentials] -aws_profile="dlt-ci-user" +profile_name="dlt-ci-user" ``` ## Additional Destination Configuration @@ -92,7 +92,7 @@ scanning your bucket and reading all relevant parquet files in there. `dlt` internal tables are saved as Iceberg tables. ### Data types -Athena tables store timestamps with millisecond precision and with that precision we generate parquet files. Mind that Iceberg tables have microsecond precision. +Athena tables store timestamps with millisecond precision and with that precision we generate parquet files. Mind that Iceberg tables have microsecond precision. Athena does not support JSON fields so JSON is stored as string. diff --git a/docs/website/docs/dlt-ecosystem/destinations/filesystem.md b/docs/website/docs/dlt-ecosystem/destinations/filesystem.md index 770e529de7..8db9a35514 100644 --- a/docs/website/docs/dlt-ecosystem/destinations/filesystem.md +++ b/docs/website/docs/dlt-ecosystem/destinations/filesystem.md @@ -44,7 +44,7 @@ aws_secret_access_key = "please set me up!" # copy the secret access key here If you have your credentials stored in `~/.aws/credentials` just remove the **[destination.filesystem.credentials]** section above and `dlt` will fall back to your **default** profile in local credentials. If you want to switch the profile, pass the profile name as follows (here: `dlt-ci-user`): ```toml [destination.filesystem.credentials] -aws_profile="dlt-ci-user" +profile_name="dlt-ci-user" ``` You can also pass an aws region: diff --git a/docs/website/docs/dlt-ecosystem/destinations/weaviate.md b/docs/website/docs/dlt-ecosystem/destinations/weaviate.md index ca0477490b..919ed3cff5 100644 --- a/docs/website/docs/dlt-ecosystem/destinations/weaviate.md +++ b/docs/website/docs/dlt-ecosystem/destinations/weaviate.md @@ -221,13 +221,33 @@ Here's a summary of the naming normalization approach: #### Property names - Snake case and camel case remain unchanged: `snake_case_name` and `camelCaseName`. -- Names with multiple underscores, such as Snake-______c__ase_, are compacted to Snake_c_asex. Except for the case when underscores are leading, in which case they are kept: `___snake_case_name` becomes `___snake_case_name`. +- Names starting with a capital letter have it lowercased: `CamelCase` -> `camelCase` +- Names with multiple underscores, such as `Snake-______c__ase_``, are compacted to `snake_c_asex`. Except for the case when underscores are leading, in which case they are kept: `___snake_case_name` becomes `___snake_case_name`. - Names starting with a number are prefixed with a "p_". For example, `123snake_case_name` becomes `p_123snake_case_name`. #### Reserved property names Reserved property names like `id` or `additional` are prefixed with underscores for differentiation. Therefore, `id` becomes `__id` and `_id` is rendered as `___id`. +### Case insensitive naming convention +The default naming convention described above will preserve the casing of the properties (besides the first letter which is lowercased). This generates nice classes +in Weaviate but also requires that your input data does not have clashing property names when comparing case insensitive ie. (`caseName` == `casename`). In such case +Weaviate destination will fail to create classes and report a conflict. + +You can configure alternative naming convention which will lowercase all properties. The clashing properties will be merged and the classes created. Still if you have a document where clashing properties like: +```json +{"camelCase": 1, "CamelCase": 2} +``` +it will be normalized to: +``` +{"camelcase": 2} +``` +so your best course of action is to clean up the data yourself before loading and use default naming convention. Nevertheless you can configure the alternative in `config.toml`: +```toml +[schema] +naming="dlt.destinations.weaviate.naming" +``` + ## Additional destination options - `batch_size`: (int) the number of items in the batch insert request. The default is 100. @@ -273,4 +293,4 @@ Currently Weaviate destination does not support dbt. ### Syncing of `dlt` state -Weaviate destination does not support syncing of the `dlt` state. +Weaviate destination supports syncing of the `dlt` state. diff --git a/tests/common/cases/schemas/eth/ethereum_schema_v6.yml b/tests/common/cases/schemas/eth/ethereum_schema_v6.yml index a8445645ff..e7f5e563ce 100644 --- a/tests/common/cases/schemas/eth/ethereum_schema_v6.yml +++ b/tests/common/cases/schemas/eth/ethereum_schema_v6.yml @@ -1,5 +1,5 @@ version: 13 -version_hash: ++bJOVuScYYoVUFtjmZMBV+cxsWs8irYHIMV8J1xD5g= +version_hash: Q/LxiP7taycE+u9PQNb2wiit+G5GntiifOUK2CFM3sQ= engine_version: 6 name: ethereum tables: diff --git a/tests/common/schema/test_inference.py b/tests/common/schema/test_inference.py index a5436530ce..8f17863c1b 100644 --- a/tests/common/schema/test_inference.py +++ b/tests/common/schema/test_inference.py @@ -54,7 +54,6 @@ def test_map_column_preferred_type(schema: Schema) -> None: assert schema._infer_column_type("AA", "confidence", skip_preferred=True) == "text" - def test_map_column_type(schema: Schema) -> None: # default mappings assert schema._infer_column_type("18271.11", "_column_name") == "text" @@ -498,3 +497,32 @@ def __call__(self) -> Any: assert c_row["evm2"] == 22.2 assert isinstance(c_row["evm2"], float) + +def test_infer_on_incomplete_column(schema: Schema) -> None: + # if incomplete column is present, dlt still infers column schema from the data + # but overrides it with incomplete column + incomplete_col = utils.new_column("I", nullable=False) + incomplete_col["primary_key"] = True + incomplete_col["x-special"] = "spec" + table = utils.new_table("table", columns=[incomplete_col]) + schema.update_schema(table) + # make sure that column is still incomplete and has no default hints + assert schema.get_table("table")["columns"]["I"] == { + 'name': 'I', + 'nullable': False, + 'primary_key': True, + 'x-special': 'spec' + } + + timestamp_float = 78172.128 + # add new column with preferred + row_1 = {"timestamp": timestamp_float, "confidence": "0.1", "I": "0xFF", "number": Decimal("128.67")} + _, new_table = schema.coerce_row("table", None, row_1) + assert "I" in new_table["columns"] + i_column = new_table["columns"]["I"] + assert utils.is_complete_column(i_column) + # has default hints and overrides + assert i_column["nullable"] is False + assert i_column["x-special"] == "spec" + assert i_column["primary_key"] is True + assert i_column["data_type"] == "text" diff --git a/tests/common/schema/test_merges.py b/tests/common/schema/test_merges.py new file mode 100644 index 0000000000..2856923319 --- /dev/null +++ b/tests/common/schema/test_merges.py @@ -0,0 +1,301 @@ +import pytest +from copy import copy, deepcopy + +from dlt.common.schema import Schema, utils +from dlt.common.schema.exceptions import CannotCoerceColumnException, CannotCoerceNullException, TablePropertiesConflictException +from dlt.common.schema.typing import TStoredSchema, TTableSchema + + +COL_1_HINTS = { + "cluster": False, + "foreign_key": True, + "data_type": "text", + "name": "test", + "x-special": True, + "x-special-int": 100, + "nullable": False, + "x-special-bool": False, + "prop": None + } + +COL_1_HINTS_DEFAULTS = { + 'foreign_key': True, + 'data_type': 'text', + 'name': 'test', + 'x-special': True, + 'x-special-int': 100, + 'nullable': False, + "x-special-bool": False, + } + +COL_2_HINTS = { + "nullable": True, + "name": "test_2", + "primary_key": False +} + + +def test_check_column_defaults() -> None: + assert utils.has_default_column_hint_value("data_type", "text") is False + assert utils.has_default_column_hint_value("name", 123) is False + assert utils.has_default_column_hint_value("nullable", True) is True + assert utils.has_default_column_hint_value("nullable", False) is False + assert utils.has_default_column_hint_value("x-special", False) is False + assert utils.has_default_column_hint_value("unique", False) is True + assert utils.has_default_column_hint_value("unique", True) is False + + +def test_column_remove_defaults() -> None: + clean = utils.remove_column_defaults(copy(COL_1_HINTS)) + # mind that nullable default is False and Nones will be removed + assert clean == COL_1_HINTS_DEFAULTS + # check nullable True + assert utils.remove_column_defaults(copy(COL_2_HINTS)) == {"name": "test_2"} + + +def test_column_add_defaults() -> None: + # test complete column + full = utils.add_column_defaults(copy(COL_1_HINTS)) + assert full["unique"] is False + # remove defaults from full + clean = utils.remove_column_defaults(copy(full)) + assert clean == COL_1_HINTS_DEFAULTS + # prop is None and will be removed + del full["prop"] + assert utils.add_column_defaults(copy(clean)) == full + + # test incomplete + complete_full = utils.add_column_defaults(copy(COL_2_HINTS)) + # defaults are added + assert complete_full["unique"] is False + + +def test_remove_defaults_stored_schema() -> None: + table: TTableSchema = { + "name": "table", + "parent": "parent", + "description": "description", + "resource": "🦚Table", + "x-special": 128, + "columns": { + "test": COL_1_HINTS, + "test_2": COL_2_HINTS + } + } + stored_schema: TStoredSchema = { + "name": "schema", + "tables": { + "table": deepcopy(table), + "table_copy": deepcopy(table) + }, + "x-top-level": True + } + # mock the case in table_copy where resource == table_name + stored_schema["tables"]["table_copy"]["resource"] = stored_schema["tables"]["table_copy"]["name"] = "table_copy" + + default_stored = utils.remove_defaults(stored_schema) + # nullability always present + assert default_stored["tables"]["table"]["columns"]["test"]["nullable"] is False + assert default_stored["tables"]["table"]["columns"]["test_2"]["nullable"] is True + # not removed in complete column (as it was explicitly set to False) + assert default_stored["tables"]["table"]["columns"]["test"]["cluster"] is False + # not removed in incomplete one + assert default_stored["tables"]["table"]["columns"]["test_2"]["primary_key"] is False + # resource present + assert default_stored["tables"]["table"]["resource"] == "🦚Table" + # resource removed because identical to table name + assert "resource" not in default_stored["tables"]["table_copy"] + + # apply defaults + restored_schema = utils.apply_defaults(deepcopy(default_stored)) + # drop all names - they are added + del restored_schema["tables"]["table"]["name"] + del restored_schema["tables"]["table"]["columns"]["test"]["name"] + del restored_schema["tables"]["table"]["columns"]["test_2"]["name"] + del restored_schema["tables"]["table_copy"]["name"] + del restored_schema["tables"]["table_copy"]["columns"]["test"]["name"] + del restored_schema["tables"]["table_copy"]["columns"]["test_2"]["name"] + assert stored_schema == restored_schema + + +def test_new_incomplete_column() -> None: + # no data_type so incomplete + incomplete_col = utils.new_column("I", nullable=False) + assert utils.is_complete_column(incomplete_col) is False + # default hints not there + assert "primary_key" not in incomplete_col + + incomplete_col["primary_key"] = True + incomplete_col["x-special"] = "spec" + table = utils.new_table("table", columns=[incomplete_col]) + # incomplete column must be added without hints + assert table["columns"]["I"]["primary_key"] is True + assert table["columns"]["I"]["x-special"] == "spec" + assert "merge_key" not in incomplete_col + + +def test_merge_columns() -> None: + # tab_b overrides non default + col_a = utils.merge_columns(copy(COL_1_HINTS), copy(COL_2_HINTS), merge_defaults=False) + # nullable is False - tab_b has it as default and those are not merged + assert col_a == { + "name": "test_2", + "nullable": False, + 'cluster': False, + 'foreign_key': True, + 'data_type': 'text', + 'x-special': True, + 'x-special-int': 100, + 'x-special-bool': False, + 'prop': None + } + + col_a = utils.merge_columns(copy(COL_1_HINTS), copy(COL_2_HINTS), merge_defaults=True) + # nullable is True and primary_key is present - default values are merged + assert col_a == { + "name": "test_2", + "nullable": True, + 'cluster': False, + 'foreign_key': True, + 'data_type': 'text', + 'x-special': True, + 'x-special-int': 100, + 'x-special-bool': False, + 'prop': None, + 'primary_key': False + } + + +def test_diff_tables() -> None: + table: TTableSchema = { + "name": "table", + "description": "description", + "resource": "🦚Table", + "x-special": 128, + "columns": { + "test": COL_1_HINTS, + "test_2": COL_2_HINTS + } + } + empty = utils.new_table("table") + del empty["resource"] + print(empty) + partial = utils.diff_tables(empty, deepcopy(table)) + # partial is simply table + assert partial == table + partial = utils.diff_tables(deepcopy(table), empty) + # partial is empty + assert partial == empty + + # override name and description + changed = deepcopy(table) + changed["description"] = "new description" + changed["name"] = "new name" + partial = utils.diff_tables(deepcopy(table), changed) + print(partial) + assert partial == { + "name": "new name", + "description": "new description", + "columns": {} + } + + # ignore identical table props + existing = deepcopy(table) + changed["write_disposition"] = "append" + partial = utils.diff_tables(deepcopy(existing), changed) + assert partial == { + "name": "new name", + "description": "new description", + "write_disposition": "append", + "columns": {} + } + existing["write_disposition"] = "append" + partial = utils.diff_tables(deepcopy(existing), changed) + assert partial == { + "name": "new name", + "description": "new description", + "columns": {} + } + + # detect changed column + existing = deepcopy(table) + changed = deepcopy(table) + changed["columns"]["test"]["cluster"] = True + partial = utils.diff_tables(existing, changed) + assert "test" in partial["columns"] + assert "test_2" not in partial["columns"] + assert existing["columns"]["test"] == table["columns"]["test"] != partial["columns"]["test"] + + # defaults are not ignored + existing = deepcopy(table) + changed = deepcopy(table) + changed["columns"]["test"]["foreign_key"] = False + partial = utils.diff_tables(existing, changed) + assert "test" in partial["columns"] + + # even if not present in tab_a at all + existing = deepcopy(table) + changed = deepcopy(table) + changed["columns"]["test"]["foreign_key"] = False + del existing["columns"]["test"]["foreign_key"] + partial = utils.diff_tables(existing, changed) + assert "test" in partial["columns"] + + +def test_diff_tables_conflicts() -> None: + # conflict on parents + table: TTableSchema = { + "name": "table", + "parent": "parent", + "description": "description", + "x-special": 128, + "columns": { + "test": COL_1_HINTS, + "test_2": COL_2_HINTS + } + } + + other = utils.new_table("table_2") + with pytest.raises(TablePropertiesConflictException) as cf_ex: + utils.diff_tables(table, other) + assert cf_ex.value.table_name == "table" + assert cf_ex.value.prop_name == "parent" + + # conflict on data types in columns + changed = deepcopy(table) + changed["columns"]["test"]["data_type"] = "bigint" + with pytest.raises(CannotCoerceColumnException): + utils.diff_tables(table, changed) + + +def test_merge_tables() -> None: + table: TTableSchema = { + "name": "table", + "description": "description", + "resource": "🦚Table", + "x-special": 128, + "columns": { + "test": COL_1_HINTS, + "test_2": COL_2_HINTS + } + } + changed = deepcopy(table) + changed["x-special"] = 129 + changed["description"] = "new description" + changed["new-prop-1"] = "A" + changed["new-prop-2"] = None + changed["new-prop-3"] = False + # drop column so partial has it + del table["columns"]["test"] + partial = utils.merge_tables(table, changed) + assert "test" in table["columns"] + assert table["x-special"] == 129 + assert table["description"] == "new description" + assert table["new-prop-1"] == "A" + # None are not merged in + assert "new-prop-2" not in table + assert table["new-prop-3"] is False + + # one column in partial + assert len(partial["columns"]) == 1 + assert partial["columns"]["test"] == COL_1_HINTS diff --git a/tests/common/schema/test_schema.py b/tests/common/schema/test_schema.py index 5d16b3f57f..e094f26945 100644 --- a/tests/common/schema/test_schema.py +++ b/tests/common/schema/test_schema.py @@ -189,8 +189,8 @@ def test_schema_descriptions_and_annotations(schema_storage: SchemaStorage): schema.tables["blocks"]["x-annotation"] += "Saved" schema.tables["blocks"]["columns"]["_dlt_load_id"]["description"] += "Saved" schema.tables["blocks"]["columns"]["_dlt_load_id"]["x-column-annotation"] += "Saved" + schema_storage.save_schema(schema) - print(schema_storage.save_schema(schema)) loaded_schema = schema_storage.load_schema("event") assert loaded_schema.tables["blocks"]["description"].endswith("Saved") assert loaded_schema.tables["blocks"]["x-annotation"].endswith("Saved") @@ -247,6 +247,23 @@ def test_save_store_schema_custom_normalizers(cn_schema: Schema, schema_storage: assert_new_schema_values_custom_normalizers(schema_copy) +def test_save_load_incomplete_column(schema: Schema, schema_storage_no_import: SchemaStorage) -> None: + # make sure that incomplete column is saved and restored without default hints + incomplete_col = utils.new_column("I", nullable=False) + incomplete_col["primary_key"] = True + incomplete_col["x-special"] = "spec" + table = utils.new_table("table", columns=[incomplete_col]) + schema.update_schema(table) + schema_storage_no_import.save_schema(schema) + schema_copy = schema_storage_no_import.load_schema("event") + assert schema_copy.get_table("table")["columns"]["I"] == { + 'name': 'I', + 'nullable': False, + 'primary_key': True, + 'x-special': 'spec' + } + + def test_upgrade_engine_v1_schema() -> None: schema_dict: DictStrAny = load_json_case("schemas/ev1/event.schema") # ensure engine v1 @@ -504,6 +521,36 @@ def test_normalize_table_identifiers() -> None: assert schema.tables["issues"] == schema.normalize_table_identifiers(schema.normalize_table_identifiers(issues_table)) +def test_normalize_table_identifiers_merge_columns() -> None: + # create conflicting columns + table_create = [ + { + "name": "case", + "data_type": "bigint", + "nullable": False, + "x-description": "desc" + }, + { + "name": "Case", + "data_type": "double", + "nullable": True, + "primary_key": True + }, + ] + # schema normalizing to snake case will conflict on case and Case + table = utils.new_table("blend", columns=table_create) + norm_table = Schema("norm").normalize_table_identifiers(table) + # only one column + assert len(norm_table["columns"]) == 1 + assert norm_table["columns"]["case"] == { + 'nullable': False, # remove default, preserve non default + 'primary_key': True, + 'name': 'case', + 'data_type': 'double', + 'x-description': 'desc' + } + + def assert_new_schema_values_custom_normalizers(schema: Schema) -> None: # check normalizers config assert schema._normalizers_config["names"] == "tests.common.normalizers.custom_normalizers" diff --git a/tests/common/utils.py b/tests/common/utils.py index 7a49a80efb..c4ce9ddf67 100644 --- a/tests/common/utils.py +++ b/tests/common/utils.py @@ -15,7 +15,7 @@ COMMON_TEST_CASES_PATH = "./tests/common/cases/" # for import schema tests, change when upgrading the schema version -IMPORTED_VERSION_HASH_ETH_V6 = "++bJOVuScYYoVUFtjmZMBV+cxsWs8irYHIMV8J1xD5g=" +IMPORTED_VERSION_HASH_ETH_V6 = "Q/LxiP7taycE+u9PQNb2wiit+G5GntiifOUK2CFM3sQ=" # test sentry DSN TEST_SENTRY_DSN = "https://797678dd0af64b96937435326c7d30c1@o1061158.ingest.sentry.io/4504306172821504" # preserve secrets path to be able to restore it @@ -41,11 +41,11 @@ def yml_case_path(name: str) -> str: def row_to_column_schemas(row: StrAny) -> TTableSchemaColumns: - return {k: utils.add_missing_hints({ + return {k: { "name": k, "data_type": "text", "nullable": False - }) for k in row.keys()} + } for k in row.keys()} @pytest.fixture(autouse=True) diff --git a/tests/conftest.py b/tests/conftest.py index d084e3f3af..16fd3999b3 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -68,3 +68,7 @@ def _create_pipeline_instance_id(self) -> str: # disable snowflake logging for log in ["snowflake.connector.cursor", "snowflake.connector.connection"]: logging.getLogger(log).setLevel("ERROR") + + # disable azure logging + for log in ["azure.core.pipeline.policies.http_logging_policy"]: + logging.getLogger(log).setLevel("ERROR") diff --git a/tests/extract/test_decorators.py b/tests/extract/test_decorators.py index b4e7c334e6..b032c3f817 100644 --- a/tests/extract/test_decorators.py +++ b/tests/extract/test_decorators.py @@ -14,10 +14,10 @@ from dlt.common.pipeline import StateInjectableContext, TPipelineState from dlt.common.source import _SOURCES from dlt.common.schema import Schema -from dlt.common.schema.utils import new_table +from dlt.common.schema.utils import new_table, new_column from dlt.cli.source_detection import detect_source_configs -from dlt.extract.exceptions import ExplicitSourceNameInvalid, InvalidResourceDataTypeFunctionNotAGenerator, InvalidResourceDataTypeIsNone, ParametrizedResourceUnbound, PipeNotBoundToData, ResourceFunctionExpected, ResourceInnerCallableConfigWrapDisallowed, SourceDataIsNone, SourceIsAClassTypeError, SourceNotAFunction, SourceSchemaNotAvailable +from dlt.extract.exceptions import DataItemRequiredForDynamicTableHints, ExplicitSourceNameInvalid, InconsistentTableTemplate, InvalidResourceDataTypeFunctionNotAGenerator, InvalidResourceDataTypeIsNone, ParametrizedResourceUnbound, PipeNotBoundToData, ResourceFunctionExpected, ResourceInnerCallableConfigWrapDisallowed, SourceDataIsNone, SourceIsAClassTypeError, SourceNotAFunction, SourceSchemaNotAvailable from dlt.extract.source import DltResource, DltSource from dlt.common.schema.exceptions import InvalidSchemaName @@ -178,26 +178,125 @@ def test_columns_argument() -> None: def get_users(): yield {"u": "u", "tags": [1, 2 ,3]} - t = get_users().table_schema() - # nullable is added - assert t["columns"]["tags"]["nullable"] is True + t = get_users().compute_table_schema() + + assert "nullable" not in t["columns"]["tags"] assert t["columns"]["tags"]["data_type"] == "complex" assert t["columns"]["tags"]["x-extra"] == "x-annotation" r = get_users() r.apply_hints(columns={"invalid": {"data_type": "unk", "wassup": False}}) with pytest.raises(DictValidationException): - r.table_schema() + r.compute_table_schema() r = get_users() r.apply_hints(columns={"tags": {"x-second-extra": "x-second-annotation"}}) - t = r.table_schema() + t = r.compute_table_schema() assert t["columns"]["tags"]["x-second-extra"] == "x-second-annotation" # make sure column name was set assert t["columns"]["tags"]["name"] == "tags" +def test_apply_hints_columns() -> None: + @dlt.resource(name="user", columns={"tags": {"data_type": "complex", "primary_key": True}}) + def get_users(): + yield {"u": "u", "tags": [1, 2 ,3]} + + users = get_users() + assert users.columns == {"tags": {"data_type": "complex", "name": "tags", "primary_key": True}} + assert users.columns["tags"] == users.compute_table_schema()["columns"]["tags"] + + # columns property can be changed in place + users.columns["tags"]["data_type"] = "text" + assert users.compute_table_schema()["columns"]["tags"]["data_type"] == "text" + + # apply column definition - it should be merged with defaults + users.apply_hints(columns={"tags": {"primary_key": False, "data_type": "text"}, "things": new_column("things", nullable=False)}) + assert users.columns["tags"] == {"data_type": "text", "name": "tags", "primary_key": False} + assert users.columns["things"] == {"name": "things", "nullable": False} + + # delete columns by passing empty + users.apply_hints(columns={}) + assert users.columns is None + + +def test_apply_hints() -> None: + @dlt.resource + def empty(): + yield [1, 2, 3] + + empty_r = empty() + assert empty_r.write_disposition == "append" + empty_r.apply_hints(write_disposition="replace") + assert empty_r.write_disposition == "replace" + empty_r.write_disposition = "merge" + assert empty_r.compute_table_schema()["write_disposition"] == "merge" + # delete hint + empty_r.apply_hints(write_disposition="") + empty_r.write_disposition = "append" + assert empty_r.compute_table_schema()["write_disposition"] == "append" + + empty_r.apply_hints(table_name="table", parent_table_name="parent", primary_key=["a", "b"], merge_key=["c", "a"]) + table = empty_r.compute_table_schema() + assert table["columns"]["a"] == {'merge_key': True, 'name': 'a', 'nullable': False, 'primary_key': True} + assert table["columns"]["b"] == {'name': 'b', 'nullable': False, 'primary_key': True} + assert table["columns"]["c"] == {'merge_key': True, 'name': 'c', 'nullable': False} + assert table["name"] == "table" + assert table["parent"] == "parent" + + # reset + empty_r.apply_hints(table_name="", parent_table_name="", primary_key=[], merge_key="") + table = empty_r.compute_table_schema() + assert table["name"] == "empty" + assert "parent" not in table + assert table["columns"] == {} + + # combine columns with primary key + + empty_r = empty() + empty_r.apply_hints(columns={"tags": {"data_type": "complex", "primary_key": False}}, primary_key="tags", merge_key="tags") + # primary key not set here + assert empty_r.columns["tags"] == {"data_type": "complex", "name": "tags", "primary_key": False} + # only in the computed table + assert empty_r.compute_table_schema()["columns"]["tags"] == {"data_type": "complex", "name": "tags", "primary_key": True, "merge_key": True} + + +def test_apply_dynamic_hints() -> None: + @dlt.resource + def empty(): + yield [1, 2, 3] + + empty_r = empty() + with pytest.raises(InconsistentTableTemplate): + empty_r.apply_hints(parent_table_name=lambda ev: ev["p"]) + + empty_r.apply_hints(table_name=lambda ev: ev["t"], parent_table_name=lambda ev: ev["p"]) + assert empty_r._table_name_hint_fun is not None + assert empty_r._table_has_other_dynamic_hints is True + + with pytest.raises(DataItemRequiredForDynamicTableHints): + empty_r.compute_table_schema() + table = empty_r.compute_table_schema({"t": "table", "p": "parent"}) + assert table["name"] == "table" + assert table["parent"] == "parent" + + # try write disposition and primary key + empty_r.apply_hints(primary_key=lambda ev: ev["pk"], write_disposition=lambda ev: ev["wd"]) + table = empty_r.compute_table_schema({"t": "table", "p": "parent", "pk": ["a", "b"], "wd": "skip"}) + assert table["write_disposition"] == "skip" + assert "a" in table["columns"] + + # validate fails + with pytest.raises(DictValidationException): + empty_r.compute_table_schema({"t": "table", "p": "parent", "pk": ["a", "b"], "wd": "x-skip"}) + + # dynamic columns + empty_r.apply_hints(columns=lambda ev: ev["c"]) + table = empty_r.compute_table_schema({"t": "table", "p": "parent", "pk": ["a", "b"], "wd": "skip", "c": [{"name": "tags"}]}) + assert table["columns"]["tags"] == {"name": "tags"} + + def test_columns_from_pydantic() -> None: class Columns(BaseModel): tags: List[str] @@ -207,7 +306,7 @@ class Columns(BaseModel): def get_users() -> Iterator[Dict[str, Any]]: yield None - t = get_users().table_schema() + t = get_users().compute_table_schema() assert t["columns"]["tags"]["nullable"] is False assert t["columns"]["tags"]["data_type"] == "complex" @@ -222,7 +321,7 @@ class Columns2(BaseModel): r = get_users() r.apply_hints(columns=Columns2) - t = r.table_schema() + t = r.compute_table_schema() assert t["columns"]["a"]["nullable"] is False assert t["columns"]["a"]["data_type"] == "bigint" assert t["columns"]["b"]["nullable"] is False @@ -234,8 +333,8 @@ class Columns3(BaseModel): b: float r = get_users() - r.apply_hints(columns=lambda item: Columns3) - t = r.table_schema() + r.apply_hints(table_name=lambda item: "table", columns=lambda item: Columns3) + t = r.compute_table_schema({}) assert t["columns"]["a"]["nullable"] is False assert t["columns"]["a"]["data_type"] == "complex" @@ -558,7 +657,7 @@ def invalid_disposition(): r = invalid_disposition() with pytest.raises(DictValidationException) as py_ex: - r.table_schema() + r.compute_table_schema() assert "write_disposition" in str(py_ex.value) diff --git a/tests/extract/test_sources.py b/tests/extract/test_sources.py index a20d87d3e1..9a43c6ff39 100644 --- a/tests/extract/test_sources.py +++ b/tests/extract/test_sources.py @@ -690,7 +690,7 @@ def source_r_in_r(): # this will return internal resource r_i = s.res_in_res("table", "merge") assert r_i.name == "table" - assert r_i.table_schema()["write_disposition"] == "merge" + assert r_i.compute_table_schema()["write_disposition"] == "merge" assert list(r_i("ABC")) == ["A", "B", "C"] diff --git a/tests/load/bigquery/test_bigquery_table_builder.py b/tests/load/bigquery/test_bigquery_table_builder.py index 0ab691aa2f..9c2ab84904 100644 --- a/tests/load/bigquery/test_bigquery_table_builder.py +++ b/tests/load/bigquery/test_bigquery_table_builder.py @@ -106,22 +106,3 @@ def test_double_partition_exception(gcp_client: BigQueryClient) -> None: gcp_client._get_table_update_sql("event_test_table", mod_update, False) assert excc.value.columns == ["`col4`", "`col5`"] - -def test_partition_alter_table_exception(gcp_client: BigQueryClient) -> None: - mod_update = deepcopy(TABLE_UPDATE) - # timestamp - mod_update[3]["partition"] = True - # double partition - with pytest.raises(DestinationSchemaWillNotUpdate) as excc: - gcp_client._get_table_update_sql("event_test_table", mod_update, True) - assert excc.value.columns == ["`col4`"] - - -def test_cluster_alter_table_exception(gcp_client: BigQueryClient) -> None: - mod_update = deepcopy(TABLE_UPDATE) - # timestamp - mod_update[3]["cluster"] = True - # double cluster - with pytest.raises(DestinationSchemaWillNotUpdate) as excc: - gcp_client._get_table_update_sql("event_test_table", mod_update, True) - assert excc.value.columns == ["`col4`"] diff --git a/tests/load/filesystem/test_aws_credentials.py b/tests/load/filesystem/test_aws_credentials.py index fc7a158037..83cf4b2cab 100644 --- a/tests/load/filesystem/test_aws_credentials.py +++ b/tests/load/filesystem/test_aws_credentials.py @@ -1,12 +1,12 @@ import pytest from typing import Dict - +from dlt.common.utils import digest128 from dlt.common.configuration import resolve_configuration from dlt.common.configuration.specs.aws_credentials import AwsCredentials from dlt.common.configuration.specs.exceptions import InvalidBoto3Session -from tests.common.configuration.utils import environment +from tests.common.configuration.utils import environment from tests.load.utils import ALL_FILESYSTEM_DRIVERS from tests.utils import preserve_environ, autouse_test_storage @@ -89,6 +89,24 @@ def test_aws_credentials_from_boto3(environment: Dict[str, str]) -> None: assert c.aws_access_key_id == "fake_access_key" +@pytest.mark.skipif('s3' not in ALL_FILESYSTEM_DRIVERS, reason='s3 filesystem driver not configured') +def test_aws_credentials_for_profile(environment: Dict[str, str]) -> None: + import botocore.exceptions + + c = AwsCredentials() + c.profile_name = "dlt-ci-user2" + with pytest.raises(botocore.exceptions.ProfileNotFound): + c = resolve_configuration(c) + + c = AwsCredentials() + c.profile_name = "dlt-ci-user" + try: + c = resolve_configuration(c) + assert digest128(c.aws_access_key_id) == 'S3r3CtEf074HjqVeHKj/' + except botocore.exceptions.ProfileNotFound: + pytest.skip("This test requires dlt-ci-user aws profile to be present") + + def set_aws_credentials_env(environment: Dict[str, str]) -> None: environment['AWS_ACCESS_KEY_ID'] = 'fake_access_key' environment['AWS_SECRET_ACCESS_KEY'] = 'fake_secret_key' diff --git a/tests/load/pipeline/test_restore_state.py b/tests/load/pipeline/test_restore_state.py index 6e377cc59f..0b08cd3a52 100644 --- a/tests/load/pipeline/test_restore_state.py +++ b/tests/load/pipeline/test_restore_state.py @@ -55,11 +55,11 @@ def test_restore_state_utils(destination_config: DestinationTestConfiguration) - # add _dlt_id and _dlt_load_id resource = state_resource(initial_state) resource.apply_hints(columns={ - "_dlt_id": utils.add_missing_hints({"name": "_dlt_id", "data_type": "text", "nullable": False}), - "_dlt_load_id": utils.add_missing_hints({"name": "_dlt_load_id", "data_type": "text", "nullable": False}), + "_dlt_id": {"name": "_dlt_id", "data_type": "text", "nullable": False}, + "_dlt_load_id": {"name": "_dlt_load_id", "data_type": "text", "nullable": False}, **STATE_TABLE_COLUMNS }) - schema.update_schema(schema.normalize_table_identifiers(resource.table_schema())) + schema.update_schema(schema.normalize_table_identifiers(resource.compute_table_schema())) # do not bump version here or in sync_schema, dlt won't recognize that schema changed and it won't update it in storage # so dlt in normalize stage infers _state_version table again but with different column order and the column order in schema is different # then in database. parquet is created in schema order and in Redshift it must exactly match the order. diff --git a/tests/load/redshift/test_redshift_table_builder.py b/tests/load/redshift/test_redshift_table_builder.py index 16ef6f8a76..c991844679 100644 --- a/tests/load/redshift/test_redshift_table_builder.py +++ b/tests/load/redshift/test_redshift_table_builder.py @@ -90,12 +90,3 @@ def test_create_table_with_hints(client: RedshiftClient) -> None: # no hints assert '"col3" boolean NOT NULL' in sql assert '"col4" timestamp with time zone NOT NULL' in sql - - -def test_hint_alter_table_exception(client: RedshiftClient) -> None: - mod_update = deepcopy(TABLE_UPDATE) - # timestamp - mod_update[3]["sort"] = True - with pytest.raises(DestinationSchemaWillNotUpdate) as excc: - client._get_table_update_sql("event_test_table", mod_update, True) - assert excc.value.columns == ['"col4"'] diff --git a/tests/load/snowflake/test_snowflake_table_builder.py b/tests/load/snowflake/test_snowflake_table_builder.py index efbd478089..81164625f9 100644 --- a/tests/load/snowflake/test_snowflake_table_builder.py +++ b/tests/load/snowflake/test_snowflake_table_builder.py @@ -90,13 +90,3 @@ def test_create_table_with_partition_and_cluster(snowflake_client: SnowflakeClie # clustering must be the last assert sql.endswith('CLUSTER BY ("COL2","COL5")') - - -def test_cluster_alter_table_exception(snowflake_client: SnowflakeClient) -> None: - mod_update = deepcopy(TABLE_UPDATE) - # timestamp - mod_update[3]["cluster"] = True - # double cluster - with pytest.raises(DestinationSchemaWillNotUpdate) as excc: - snowflake_client._get_table_update_sql("event_test_table", mod_update, True) - assert excc.value.columns == ['"COL4"'] diff --git a/tests/load/weaviate/test_naming.py b/tests/load/weaviate/test_naming.py index e09620e91b..a965201425 100644 --- a/tests/load/weaviate/test_naming.py +++ b/tests/load/weaviate/test_naming.py @@ -1,6 +1,7 @@ import dlt, pytest from dlt.destinations.weaviate.naming import NamingConvention +from dlt.destinations.weaviate.ci_naming import NamingConvention as CINamingConvention from tests.common.utils import load_yml_case @@ -9,8 +10,8 @@ def small(): return dlt.resource([1,2,3], name="table") -def test_table_name_normalization() -> None: - n = NamingConvention() +@pytest.mark.parametrize("n", [NamingConvention(), CINamingConvention()], ids=["naming", "ci_naming"]) +def test_table_name_normalization(n: NamingConvention) -> None: assert n.normalize_table_identifier("FlatSpace") == "FlatSpace" assert n.normalize_table_identifier("a_snake_case_name") == "ASnakeCaseName" assert n.normalize_table_identifier("_a_snake_case_name") == "ASnakeCaseName" @@ -31,17 +32,34 @@ def test_property_normalization() -> None: assert n.normalize_identifier("_camelCase") == "_camelCase" assert n.normalize_identifier("_snake_case") == "_snake_case" assert n.normalize_identifier("_snake_case_") == "_snake_casex" - assert n.normalize_identifier("Snake---🛑case_") == "Snake_casex" + assert n.normalize_identifier("Snake---🛑case_") == "snake_casex" assert n.normalize_identifier("--🛑Snake---🛑case_") == "___Snake_casex" # dashes are compacted - assert n.normalize_identifier("Snake-______c__ase_") == "Snake_c_asex" - assert n.normalize_identifier("Snake-______c__ase_") == "Snake_c_asex" + assert n.normalize_identifier("Snake-______c__ase_") == "snake_c_asex" + assert n.normalize_identifier("Snake-______c__ase_") == "snake_c_asex" # but not the leading assert n.normalize_identifier("-______Snake-______c__ase_") == "_______Snake_c_asex" # starting digit assert n.normalize_identifier("281782918739821") == "p_281782918739821" +def test_property_normalization_ci() -> None: + n = CINamingConvention() + assert n.normalize_identifier("camelCase") == "camelcase" + assert n.normalize_identifier("_camelCase") == "_camelcase" + assert n.normalize_identifier("_snake_case") == "_snake_case" + assert n.normalize_identifier("_snake_case_") == "_snake_casex" + assert n.normalize_identifier("Snake---🛑case_") == "snake_casex" + assert n.normalize_identifier("--🛑Snake---🛑case_") == "___snake_casex" + # dashes are compacted + assert n.normalize_identifier("Snake-______c__ase_") == "snake_c_asex" + assert n.normalize_identifier("Snake-______c__ase_") == "snake_c_asex" + # but not the leading + assert n.normalize_identifier("-______Snake-______c__ase_") == "_______snake_c_asex" + # starting digit + assert n.normalize_identifier("281782918739821") == "p_281782918739821" + + def test_reserved_property_names() -> None: n = NamingConvention() assert n.normalize_identifier("id") == "__id" diff --git a/tests/load/weaviate/test_pipeline.py b/tests/load/weaviate/test_pipeline.py index aa71d73539..138a9aa87e 100644 --- a/tests/load/weaviate/test_pipeline.py +++ b/tests/load/weaviate/test_pipeline.py @@ -2,11 +2,14 @@ import dlt from dlt.common import json +from dlt.common.schema import Schema from dlt.common.utils import uniq_id from dlt.destinations.weaviate import weaviate_adapter +from dlt.destinations.weaviate.exceptions import PropertyNameConflict from dlt.destinations.weaviate.weaviate_adapter import VECTORIZE_HINT, TOKENIZATION_HINT from dlt.destinations.weaviate.weaviate_client import WeaviateClient +from dlt.pipeline.exceptions import PipelineStepFailed from tests.pipeline.utils import assert_load_info from .utils import assert_class, drop_active_pipeline_data @@ -24,6 +27,23 @@ def sequence_generator(): count += 3 +def test_adapter_and_hints() -> None: + generator_instance1 = sequence_generator() + + @dlt.resource(columns=[{"name": "content", "data_type": "text"}]) + def some_data(): + yield from next(generator_instance1) + + assert some_data.columns["content"] == {"name": "content", "data_type": "text"} + + # adapter merges with existing columns + weaviate_adapter( + some_data, + vectorize=["content"], + ) + assert some_data.columns["content"] == {"name": "content", "data_type": "text", "x-weaviate-vectorize": True} + + def test_basic_state_and_schema() -> None: generator_instance1 = sequence_generator() @@ -336,14 +356,35 @@ def test_empty_dataset_allowed() -> None: def test_vectorize_property_without_data() -> None: # we request to vectorize "content" but property with this name does not appear in the data # an incomplete column was created and it can't be created at destination - p = dlt.pipeline(destination="weaviate", full_refresh=True) - # check if we use localhost - client: WeaviateClient = p.destination_client() - if "localhost" not in client.config.credentials.url: - pytest.skip("skip to avoid race condition with other tests") + dataset_name = "without_data_" + uniq_id() + p = dlt.pipeline(destination="weaviate", dataset_name=dataset_name) - assert p.dataset_name is None info = p.run(weaviate_adapter(["a", "b", "c"], vectorize=["content"])) # dataset in load info is empty assert_load_info(info) assert_class(p, "Content", expected_items_count=3) + + # here we increase the abuse and try to vectorize a `Value` field, where in the data there's `value` + # in standard naming convention this results in property conflict + with pytest.raises(PipelineStepFailed) as pipe_ex: + p.run(weaviate_adapter(["a", "b", "c"], vectorize="vAlue"), primary_key="vAlue", columns={"vAlue": {"data_type": "text"}}) + assert isinstance(pipe_ex.value.__context__, PropertyNameConflict) + + # set the naming convention to case insensitive + # os.environ["SCHEMA__NAMING"] = "direct" + dlt.config["schema.naming"] = "dlt.destinations.weaviate.ci_naming" + # create new schema with changed naming convention + p = p.drop() + info = p.run(weaviate_adapter(["there are", "no stop", "words in here"], vectorize="vAlue"), primary_key="vALue", columns={"vAlue": {"data_type": "text"}}) + # dataset in load info is empty + assert_load_info(info) + # print(p.default_schema.to_pretty_yaml()) + table_schema = p.default_schema.get_table("Content") + value_column = table_schema["columns"]["value"] + assert value_column["primary_key"] is True + assert value_column["x-weaviate-vectorize"] is True + + # we forced schema change in the pipeline but weaviate does not support enabling vectorization on existing properties and classes + # so mock the class otherwise the test will not pass + value_column["x-weaviate-vectorize"] = False + assert_class(p, "Content", expected_items_count=6) diff --git a/tests/load/weaviate/test_weaviate_client.py b/tests/load/weaviate/test_weaviate_client.py index 4595d22809..8ef3ddd660 100644 --- a/tests/load/weaviate/test_weaviate_client.py +++ b/tests/load/weaviate/test_weaviate_client.py @@ -8,6 +8,7 @@ from dlt.common.utils import uniq_id from dlt.destinations import weaviate +from dlt.destinations.weaviate.exceptions import PropertyNameConflict from dlt.destinations.weaviate.weaviate_client import WeaviateClient from dlt.common.storages.file_storage import FileStorage @@ -33,8 +34,17 @@ def get_client_instance(schema: Schema) -> WeaviateClient: @pytest.fixture(scope='function') def client() -> Iterator[WeaviateClient]: + yield from make_client("naming") + + +@pytest.fixture(scope='function') +def ci_client() -> Iterator[WeaviateClient]: + yield from make_client("ci_naming") + + +def make_client(naming_convention: str) -> Iterator[WeaviateClient]: schema = Schema('test_schema', { - 'names': "dlt.destinations.weaviate.naming", + 'names': f"dlt.destinations.weaviate.{naming_convention}", 'json': None }) _client = get_client_instance(schema) @@ -61,6 +71,144 @@ def test_all_data_types(client: WeaviateClient, write_disposition: str, file_sto with io.BytesIO() as f: write_dataset(client, f, [TABLE_ROW_ALL_DATA_TYPES], TABLE_UPDATE_COLUMNS_SCHEMA) query = f.getvalue().decode() - job = expect_load_file(client, file_storage, query, class_name) - print(job) - # TODO: use assert_class to see if all data is there, extend it to check data types + expect_load_file(client, file_storage, query, class_name) + _, table_columns = client.get_storage_table("AllTypes") + # for now check if all columns are there + assert len(table_columns) == len(TABLE_UPDATE_COLUMNS_SCHEMA) + for col_name in table_columns: + assert col_name in TABLE_UPDATE_COLUMNS_SCHEMA + if TABLE_UPDATE_COLUMNS_SCHEMA[col_name]["data_type"] in ["decimal", "complex", "time"]: + # no native representation + assert table_columns[col_name]["data_type"] == "text" + elif TABLE_UPDATE_COLUMNS_SCHEMA[col_name]["data_type"] == "wei": + assert table_columns[col_name]["data_type"] == "double" + elif TABLE_UPDATE_COLUMNS_SCHEMA[col_name]["data_type"] == "date": + assert table_columns[col_name]["data_type"] == "timestamp" + else: + assert table_columns[col_name]["data_type"] == TABLE_UPDATE_COLUMNS_SCHEMA[col_name]["data_type"] + + +def test_case_sensitive_properties_create(client: WeaviateClient) -> None: + class_name = "col_class" + # we have two properties which will map to the same name in Weaviate + table_create = [ + { + "name": "col1", + "data_type": "bigint", + "nullable": False + }, + { + "name": "coL1", + "data_type": "double", + "nullable": False + }, + ] + client.schema.update_schema(client.schema.normalize_table_identifiers(new_table(class_name, columns=table_create))) + client.schema.bump_version() + with pytest.raises(PropertyNameConflict): + client.update_stored_schema() + + +def test_case_insensitive_properties_create(ci_client: WeaviateClient) -> None: + class_name = "col_class" + # we have two properties which will map to the same name in Weaviate + table_create = [ + { + "name": "col1", + "data_type": "bigint", + "nullable": False + }, + { + "name": "coL1", + "data_type": "double", + "nullable": False + }, + ] + ci_client.schema.update_schema(ci_client.schema.normalize_table_identifiers(new_table(class_name, columns=table_create))) + ci_client.schema.bump_version() + ci_client.update_stored_schema() + _, table_columns = ci_client.get_storage_table("ColClass") + # later column overwrites earlier one so: double + assert table_columns == {'col1': {'name': 'col1', 'data_type': 'double'}} + + +def test_case_sensitive_properties_add(client: WeaviateClient) -> None: + class_name = "col_class" + # we have two properties which will map to the same name in Weaviate + table_create = [{ + "name": "col1", + "data_type": "bigint", + "nullable": False + }] + table_update = [{ + "name": "coL1", + "data_type": "double", + "nullable": False + }, + ] + client.schema.update_schema( + client.schema.normalize_table_identifiers(new_table(class_name, columns=table_create)) + ) + client.schema.bump_version() + client.update_stored_schema() + + client.schema.update_schema( + client.schema.normalize_table_identifiers(new_table(class_name, columns=table_update)) + ) + client.schema.bump_version() + with pytest.raises(PropertyNameConflict): + client.update_stored_schema() + + # _, table_columns = client.get_storage_table("ColClass") + # print(table_columns) + + +def test_load_case_sensitive_data(client: WeaviateClient, file_storage: FileStorage) -> None: + class_name = "col_class" + # we have two properties which will map to the same name in Weaviate + table_create = {"col1": + { + "name": "col1", + "data_type": "bigint", + "nullable": False + }} + client.schema.update_schema(new_table(class_name, columns=[table_create["col1"]])) + client.schema.bump_version() + client.update_stored_schema() + # prepare a data item where is name clash due to Weaviate being CI + data_clash = {"col1": 72187328, "coL1": 726171} + # write row + with io.BytesIO() as f: + write_dataset(client, f, [data_clash], table_create) + query = f.getvalue().decode() + with pytest.raises(PropertyNameConflict): + expect_load_file(client, file_storage, query, class_name) + + +def test_load_case_sensitive_data_ci(ci_client: WeaviateClient, file_storage: FileStorage) -> None: + class_name = "col_class" + # we have two properties which will map to the same name in Weaviate + table_create = {"col1": + { + "name": "col1", + "data_type": "bigint", + "nullable": False + }} + ci_client.schema.update_schema(new_table(class_name, columns=[table_create["col1"]])) + ci_client.schema.bump_version() + ci_client.update_stored_schema() + # prepare a data item where is name clash due to Weaviate being CI + # but here we normalize the item + data_clash = list( + ci_client.schema.normalize_data_item({"col1": 72187328, "coL1": 726171}, "_load_id_", "col_class") + )[0][1] + + # write row + with io.BytesIO() as f: + write_dataset(ci_client, f, [data_clash], table_create) + query = f.getvalue().decode() + expect_load_file(ci_client, file_storage, query, class_name) + response = ci_client.query_class(class_name, ["col1"]).do() + objects = response["data"]["Get"][ci_client.make_qualified_class_name(class_name)] + # the latter of conflicting fields is stored (so data is lost) + assert objects == [{'col1': 726171}] diff --git a/tests/load/weaviate/utils.py b/tests/load/weaviate/utils.py index 7f40443ca2..26e1bba135 100644 --- a/tests/load/weaviate/utils.py +++ b/tests/load/weaviate/utils.py @@ -52,7 +52,7 @@ def assert_class( # response = db_client.query.get(class_name, list(properties.keys())).do() response = client.query_class(class_name, list(properties.keys())).do() - objects = response["data"]["Get"][client.make_full_name(class_name)] + objects = response["data"]["Get"][client.make_qualified_class_name(class_name)] if expected_items_count is not None: assert expected_items_count == len(objects) diff --git a/tests/pipeline/test_pipeline.py b/tests/pipeline/test_pipeline.py index 16542e8e3a..c668d81073 100644 --- a/tests/pipeline/test_pipeline.py +++ b/tests/pipeline/test_pipeline.py @@ -18,6 +18,7 @@ from dlt.common.exceptions import DestinationHasFailedJobs, DestinationTerminalException, PipelineStateNotAvailable, UnknownDestinationModule from dlt.common.pipeline import PipelineContext from dlt.common.runtime.collector import AliveCollector, EnlightenCollector, LogCollector, TqdmCollector +from dlt.common.schema.utils import new_column from dlt.common.utils import uniq_id from dlt.extract.exceptions import InvalidResourceDataTypeBasic, PipeGenInvalid, SourceExhausted @@ -902,10 +903,10 @@ def test_extract_add_tables() -> None: # we extract and make sure that tables are added to schema s = airtable_emojis() assert list(s.resources.keys()) == ["💰Budget", "📆 Schedule", "🦚Peacock", "🦚WidePeacock"] - assert s.resources["🦚Peacock"].table_schema()["resource"] == "🦚Peacock" + assert s.resources["🦚Peacock"].compute_table_schema()["resource"] == "🦚Peacock" # only name will be normalized - assert s.resources["🦚Peacock"].table_schema()["name"] == "🦚Peacock" - assert s.resources["💰Budget"].table_schema()["columns"]["🔑book_id"]["name"] == "🔑book_id" + assert s.resources["🦚Peacock"].compute_table_schema()["name"] == "🦚Peacock" + assert s.resources["💰Budget"].compute_table_schema()["columns"]["🔑book_id"]["name"] == "🔑book_id" pipeline = dlt.pipeline(pipeline_name="emojis", destination="dummy") info = pipeline.extract(s) assert info.extract_data_info[0]["name"] == "airtable_emojis" @@ -971,6 +972,39 @@ def test_emojis_resource_names() -> None: assert table["resource"] == "🦚WidePeacock" +def test_apply_hints_infer_hints() -> None: + os.environ["COMPLETED_PROB"] = "1.0" + + @dlt.source + def infer(): + yield dlt.resource([{"id": 1, "timestamp": "NOW"}], name="table1", columns=[new_column("timestamp", nullable=True)]) + + new_new_hints = { + "not_null": ["timestamp"], + "primary_key": ["id"] + } + s = infer() + s.schema.merge_hints(new_new_hints) + pipeline = dlt.pipeline(pipeline_name="inf", destination="dummy") + pipeline.run(s) + # check schema + table = pipeline.default_schema.get_table("table1") + # nullable True coming from hint overrides inferred hint + assert table["columns"]["timestamp"] == {"name": "timestamp", "data_type": "text", "nullable": True} + # fully from data + assert table["columns"]["id"] == {"name": "id", "data_type": "bigint", "nullable": True, "primary_key": True} + + # remove primary key and change nullable + s = infer() + s.table1.apply_hints(columns=[{"name": "timestamp", "nullable": False}, {"name": "id", "nullable": False, "primary_key": False}]) + pipeline.run(s) + table = pipeline.default_schema.get_table("table1") + # hints overwrite pipeline schema + assert table["columns"]["timestamp"] == {"name": "timestamp", "data_type": "text", "nullable": False} + assert table["columns"]["id"] == {"name": "id", "data_type": "bigint", "nullable": False, "primary_key": False} + # print(pipeline.default_schema.to_pretty_yaml()) + + def test_invalid_data_edge_cases() -> None: # pass not evaluated source function @dlt.source