Skip to content

Commit

Permalink
made table index type logic Synapse specific through destination adapter
Browse files Browse the repository at this point in the history
  • Loading branch information
Jorrit Sandbrink committed Jan 26, 2024
1 parent 014543a commit 7868ca6
Show file tree
Hide file tree
Showing 15 changed files with 117 additions and 82 deletions.
4 changes: 1 addition & 3 deletions dlt/common/destination/reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
from dlt.common.schema import Schema, TTableSchema, TSchemaTables
from dlt.common.schema.typing import TWriteDisposition
from dlt.common.schema.exceptions import InvalidDatasetName
from dlt.common.schema.utils import get_write_disposition, get_table_format, get_table_index_type
from dlt.common.schema.utils import get_write_disposition, get_table_format
from dlt.common.configuration import configspec, with_config, resolve_configuration, known_sections
from dlt.common.configuration.specs import BaseConfiguration, CredentialsConfiguration
from dlt.common.configuration.accessors import config
Expand Down Expand Up @@ -372,8 +372,6 @@ def get_load_table(self, table_name: str, prepare_for_staging: bool = False) ->
table["write_disposition"] = get_write_disposition(self.schema.tables, table_name)
if "table_format" not in table:
table["table_format"] = get_table_format(self.schema.tables, table_name)
if "table_index_type" not in table:
table["table_index_type"] = get_table_index_type(self.schema.tables, table_name)
return table
except KeyError:
raise UnknownTableException(table_name)
Expand Down
3 changes: 0 additions & 3 deletions dlt/common/schema/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,6 @@
"""Known hints of a column used to declare hint regexes."""
TWriteDisposition = Literal["skip", "append", "replace", "merge"]
TTableFormat = Literal["iceberg"]
TTableIndexType = Literal["heap", "clustered_columnstore_index"]
"Table index type. Currently only used for Synapse destination."
TTypeDetections = Literal[
"timestamp", "iso_timestamp", "iso_date", "large_integer", "hexbytes_to_text", "wei_to_double"
]
Expand Down Expand Up @@ -167,7 +165,6 @@ class TTableSchema(TypedDict, total=False):
columns: TTableSchemaColumns
resource: Optional[str]
table_format: Optional[TTableFormat]
table_index_type: Optional[TTableIndexType]


class TPartialTableSchema(TTableSchema):
Expand Down
12 changes: 0 additions & 12 deletions dlt/common/schema/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
TColumnSchema,
TColumnProp,
TTableFormat,
TTableIndexType,
TColumnHint,
TTypeDetectionFunc,
TTypeDetections,
Expand Down Expand Up @@ -619,14 +618,6 @@ def get_table_format(tables: TSchemaTables, table_name: str) -> TTableFormat:
)


def get_table_index_type(tables: TSchemaTables, table_name: str) -> TTableIndexType:
"""Returns table index type of a table if present. If not, looks up into parent table."""
return cast(
TTableIndexType,
get_inherited_table_hint(tables, table_name, "table_index_type", allow_none=True),
)


def table_schema_has_type(table: TTableSchema, _typ: TDataType) -> bool:
"""Checks if `table` schema contains column with type _typ"""
return any(c.get("data_type") == _typ for c in table["columns"].values())
Expand Down Expand Up @@ -733,7 +724,6 @@ def new_table(
resource: str = None,
schema_contract: TSchemaContract = None,
table_format: TTableFormat = None,
table_index_type: TTableIndexType = None,
) -> TTableSchema:
table: TTableSchema = {
"name": table_name,
Expand All @@ -752,8 +742,6 @@ def new_table(
table["schema_contract"] = schema_contract
if table_format:
table["table_format"] = table_format
if table_index_type is not None:
table["table_index_type"] = table_index_type
if validate_schema:
validate_dict_ignoring_xkeys(
spec=TColumnSchema,
Expand Down
3 changes: 2 additions & 1 deletion dlt/destinations/adapters.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@

from dlt.destinations.impl.weaviate import weaviate_adapter
from dlt.destinations.impl.qdrant import qdrant_adapter
from dlt.destinations.impl.synapse import synapse_adapter

__all__ = ["weaviate_adapter", "qdrant_adapter"]
__all__ = ["weaviate_adapter", "qdrant_adapter", "synapse_adapter"]
11 changes: 2 additions & 9 deletions dlt/destinations/impl/qdrant/qdrant_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from dlt.common.schema.typing import TColumnNames, TTableSchemaColumns
from dlt.extract import DltResource, resource as make_resource
from dlt.destinations.utils import ensure_resource

VECTORIZE_HINT = "x-qdrant-embed"

Expand Down Expand Up @@ -31,15 +32,7 @@ def qdrant_adapter(
>>> qdrant_adapter(data, embed="description")
[DltResource with hints applied]
"""
# wrap `data` in a resource if not an instance already
resource: DltResource
if not isinstance(data, DltResource):
resource_name: str = None
if not hasattr(data, "__name__"):
resource_name = "content"
resource = make_resource(data, name=resource_name)
else:
resource = data
resource = ensure_resource(data)

column_hints: TTableSchemaColumns = {}

Expand Down
2 changes: 2 additions & 0 deletions dlt/destinations/impl/synapse/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
from dlt.common.arithmetics import DEFAULT_NUMERIC_PRECISION, DEFAULT_NUMERIC_SCALE
from dlt.common.wei import EVM_DECIMAL_PRECISION

from dlt.destinations.impl.synapse.synapse_adapter import synapse_adapter


def capabilities() -> DestinationCapabilitiesContext:
caps = DestinationCapabilitiesContext()
Expand Down
4 changes: 3 additions & 1 deletion dlt/destinations/impl/synapse/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from dlt.common import logger
from dlt.common.configuration import configspec
from dlt.common.schema.typing import TTableIndexType, TSchemaTables
from dlt.common.schema.typing import TSchemaTables
from dlt.common.schema.utils import get_write_disposition

from dlt.destinations.impl.mssql.configuration import (
Expand All @@ -11,6 +11,8 @@
)
from dlt.destinations.impl.mssql.configuration import MsSqlCredentials

from dlt.destinations.impl.synapse.synapse_adapter import TTableIndexType


@configspec
class SynapseCredentials(MsSqlCredentials):
Expand Down
4 changes: 2 additions & 2 deletions dlt/destinations/impl/synapse/factory.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import typing as t

from dlt.common.destination import Destination, DestinationCapabilitiesContext
from dlt.common.schema.typing import TTableIndexType
from dlt.destinations.impl.synapse import capabilities

from dlt.destinations.impl.synapse import capabilities
from dlt.destinations.impl.synapse.configuration import (
SynapseCredentials,
SynapseClientConfiguration,
)
from dlt.destinations.impl.synapse.synapse_adapter import TTableIndexType

if t.TYPE_CHECKING:
from dlt.destinations.impl.synapse.synapse import SynapseClient
Expand Down
23 changes: 17 additions & 6 deletions dlt/destinations/impl/synapse/synapse.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
)

from dlt.common.schema import TTableSchema, TColumnSchema, Schema, TColumnHint
from dlt.common.schema.utils import table_schema_has_type
from dlt.common.schema.typing import TTableSchemaColumns, TTableIndexType
from dlt.common.schema.utils import table_schema_has_type, get_inherited_table_hint
from dlt.common.schema.typing import TTableSchemaColumns

from dlt.common.configuration.specs import AzureCredentialsWithoutDefaults

Expand All @@ -34,6 +34,10 @@
from dlt.destinations.impl.synapse import capabilities
from dlt.destinations.impl.synapse.sql_client import SynapseSqlClient
from dlt.destinations.impl.synapse.configuration import SynapseClientConfiguration
from dlt.destinations.impl.synapse.synapse_adapter import (
TABLE_INDEX_TYPE_HINT,
TTableIndexType,
)


HINT_TO_SYNAPSE_ATTR: Dict[TColumnHint, str] = {
Expand Down Expand Up @@ -68,7 +72,7 @@ def _get_table_update_sql(
if table is None:
table_index_type = self.config.default_table_index_type
else:
table_index_type = table.get("table_index_type")
table_index_type = cast(TTableIndexType, table.get(TABLE_INDEX_TYPE_HINT))
if table_index_type == "clustered_columnstore_index":
new_columns = self._get_columstore_valid_columns(new_columns)

Expand Down Expand Up @@ -128,9 +132,16 @@ def get_load_table(self, table_name: str, staging: bool = False) -> TTableSchema
# configuration. Why? "For small lookup tables, less than 60 million rows,
# consider using HEAP or clustered index for faster query performance."
# https://learn.microsoft.com/en-us/azure/synapse-analytics/sql-data-warehouse/sql-data-warehouse-tables-index#heap-tables
table["table_index_type"] = "heap"
if table["table_index_type"] is None:
table["table_index_type"] = self.config.default_table_index_type
table[TABLE_INDEX_TYPE_HINT] = "heap" # type: ignore[typeddict-unknown-key]
elif table_name in self.schema.data_table_names():
if TABLE_INDEX_TYPE_HINT not in table:
# If present in parent table, fetch hint from there.
table[TABLE_INDEX_TYPE_HINT] = get_inherited_table_hint( # type: ignore[typeddict-unknown-key]
self.schema.tables, table_name, TABLE_INDEX_TYPE_HINT, allow_none=True
)
if table[TABLE_INDEX_TYPE_HINT] is None: # type: ignore[typeddict-item]
# Hint still not defined, fall back to default.
table[TABLE_INDEX_TYPE_HINT] = self.config.default_table_index_type # type: ignore[typeddict-unknown-key]
return table

def get_storage_table_index_type(self, table_name: str) -> TTableIndexType:
Expand Down
50 changes: 50 additions & 0 deletions dlt/destinations/impl/synapse/synapse_adapter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
from typing import Any, Literal, Set, get_args, Final

from dlt.extract import DltResource, resource as make_resource
from dlt.extract.typing import TTableHintTemplate
from dlt.extract.hints import TResourceHints
from dlt.destinations.utils import ensure_resource

TTableIndexType = Literal["heap", "clustered_columnstore_index"]
"""
Table [index type](https://learn.microsoft.com/en-us/azure/synapse-analytics/sql-data-warehouse/sql-data-warehouse-tables-index) used when creating the Synapse table.
This regards indexes specified at the table level, not the column level.
"""
TABLE_INDEX_TYPES: Set[TTableIndexType] = set(get_args(TTableIndexType))

TABLE_INDEX_TYPE_HINT: Literal["x-table-index-type"] = "x-table-index-type"


def synapse_adapter(data: Any, table_index_type: TTableIndexType = None) -> DltResource:
"""Prepares data for the Synapse destination by specifying which table index
type should be used.

Args:
data (Any): The data to be transformed. It can be raw data or an instance
of DltResource. If raw data, the function wraps it into a DltResource
object.
table_index_type (TTableIndexType, optional): The table index type used when creating
the Synapse table.

Returns:
DltResource: A resource with applied Synapse-specific hints.

Raises:
ValueError: If input for `table_index_type` is invalid.

Examples:
>>> data = [{"name": "Anush", "description": "Integrations Hacker"}]
>>> synapse_adapter(data, table_index_type="clustered_columnstore_index")
[DltResource with hints applied]
"""
resource = ensure_resource(data)

if table_index_type is not None:
if table_index_type not in TABLE_INDEX_TYPES:
allowed_types = ", ".join(TABLE_INDEX_TYPES)
raise ValueError(
f"Table index type {table_index_type} is invalid. Allowed table index"
f" types are: {allowed_types}."
)
resource._hints[TABLE_INDEX_TYPE_HINT] = table_index_type # type: ignore[typeddict-unknown-key]
return resource
11 changes: 2 additions & 9 deletions dlt/destinations/impl/weaviate/weaviate_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from dlt.common.schema.typing import TColumnNames, TTableSchemaColumns
from dlt.extract import DltResource, resource as make_resource
from dlt.destinations.utils import ensure_resource

TTokenizationTMethod = Literal["word", "lowercase", "whitespace", "field"]
TOKENIZATION_METHODS: Set[TTokenizationTMethod] = set(get_args(TTokenizationTMethod))
Expand Down Expand Up @@ -53,15 +54,7 @@ def weaviate_adapter(
>>> weaviate_adapter(data, vectorize="description", tokenization={"description": "word"})
[DltResource with hints applied]
"""
# wrap `data` in a resource if not an instance already
resource: DltResource
if not isinstance(data, DltResource):
resource_name: str = None
if not hasattr(data, "__name__"):
resource_name = "content"
resource = make_resource(data, name=resource_name)
else:
resource = data
resource = ensure_resource(data)

column_hints: TTableSchemaColumns = {}
if vectorize:
Expand Down
16 changes: 16 additions & 0 deletions dlt/destinations/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from typing import Any

from dlt.extract import DltResource, resource as make_resource


def ensure_resource(data: Any) -> DltResource:
"""Wraps `data` in a DltResource if it's not a DltResource already."""
resource: DltResource
if not isinstance(data, DltResource):
resource_name: str = None
if not hasattr(data, "__name__"):
resource_name = "content"
resource = make_resource(data, name=resource_name)
else:
resource = data
return resource
7 changes: 0 additions & 7 deletions dlt/extract/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
TAnySchemaColumns,
TSchemaContract,
TTableFormat,
TTableIndexType,
)
from dlt.extract.utils import (
ensure_table_schema_columns_hint,
Expand Down Expand Up @@ -257,7 +256,6 @@ def resource(
merge_key: TTableHintTemplate[TColumnNames] = None,
schema_contract: TTableHintTemplate[TSchemaContract] = None,
table_format: TTableHintTemplate[TTableFormat] = None,
table_index_type: TTableHintTemplate[TTableIndexType] = None,
selected: bool = True,
spec: Type[BaseConfiguration] = None,
) -> DltResource: ...
Expand All @@ -275,7 +273,6 @@ def resource(
merge_key: TTableHintTemplate[TColumnNames] = None,
schema_contract: TTableHintTemplate[TSchemaContract] = None,
table_format: TTableHintTemplate[TTableFormat] = None,
table_index_type: TTableHintTemplate[TTableIndexType] = None,
selected: bool = True,
spec: Type[BaseConfiguration] = None,
) -> Callable[[Callable[TResourceFunParams, Any]], DltResource]: ...
Expand All @@ -293,7 +290,6 @@ def resource(
merge_key: TTableHintTemplate[TColumnNames] = None,
schema_contract: TTableHintTemplate[TSchemaContract] = None,
table_format: TTableHintTemplate[TTableFormat] = None,
table_index_type: TTableHintTemplate[TTableIndexType] = None,
selected: bool = True,
spec: Type[BaseConfiguration] = None,
standalone: Literal[True] = True,
Expand All @@ -312,7 +308,6 @@ def resource(
merge_key: TTableHintTemplate[TColumnNames] = None,
schema_contract: TTableHintTemplate[TSchemaContract] = None,
table_format: TTableHintTemplate[TTableFormat] = None,
table_index_type: TTableHintTemplate[TTableIndexType] = None,
selected: bool = True,
spec: Type[BaseConfiguration] = None,
) -> DltResource: ...
Expand All @@ -329,7 +324,6 @@ def resource(
merge_key: TTableHintTemplate[TColumnNames] = None,
schema_contract: TTableHintTemplate[TSchemaContract] = None,
table_format: TTableHintTemplate[TTableFormat] = None,
table_index_type: TTableHintTemplate[TTableIndexType] = None,
selected: bool = True,
spec: Type[BaseConfiguration] = None,
standalone: bool = False,
Expand Down Expand Up @@ -409,7 +403,6 @@ def make_resource(
merge_key=merge_key,
schema_contract=schema_contract,
table_format=table_format,
table_index_type=table_index_type,
)
return DltResource.from_data(
_data,
Expand Down
3 changes: 0 additions & 3 deletions dlt/extract/hints.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
TWriteDisposition,
TAnySchemaColumns,
TTableFormat,
TTableIndexType,
TSchemaContract,
)
from dlt.common.typing import TDataItem
Expand Down Expand Up @@ -275,7 +274,6 @@ def new_table_template(
merge_key: TTableHintTemplate[TColumnNames] = None,
schema_contract: TTableHintTemplate[TSchemaContract] = None,
table_format: TTableHintTemplate[TTableFormat] = None,
table_index_type: TTableHintTemplate[TTableIndexType] = None,
) -> TResourceHints:
validator, schema_contract = create_item_validator(columns, schema_contract)
clean_columns = columns
Expand All @@ -291,7 +289,6 @@ def new_table_template(
columns=clean_columns, # type: ignore
schema_contract=schema_contract, # type: ignore
table_format=table_format, # type: ignore
table_index_type=table_index_type, # type: ignore
)
if not table_name:
new_template.pop("name")
Expand Down
Loading

0 comments on commit 7868ca6

Please sign in to comment.