diff --git a/dlt/destinations/impl/clickhouse/clickhouse.py b/dlt/destinations/impl/clickhouse/clickhouse.py index 21c9eefe63..b5d9624b97 100644 --- a/dlt/destinations/impl/clickhouse/clickhouse.py +++ b/dlt/destinations/impl/clickhouse/clickhouse.py @@ -1,15 +1,37 @@ -from typing import ClassVar, Optional +from copy import deepcopy +from typing import ClassVar, Optional, Dict, List, Sequence from dlt.common.destination import DestinationCapabilitiesContext from dlt.common.destination.reference import SupportsStagingDestination from dlt.common.schema import Schema, TColumnSchema -from dlt.common.schema.typing import TColumnType, TTableFormat +from dlt.common.schema.typing import TTableFormat, TTableSchema, TColumnHint, TColumnType from dlt.destinations.impl.clickhouse import capabilities -from dlt.destinations.impl.clickhouse.configuration import ClickhouseClientConfiguration -from dlt.destinations.job_client_impl import SqlJobClientWithStaging -from dlt.destinations.sql_client import SqlClientBase +from dlt.destinations.impl.clickhouse.clickhouse_adapter import ( + TTableEngineType, + TABLE_ENGINE_TYPE_HINT, +) +from dlt.destinations.impl.clickhouse.configuration import ( + ClickhouseClientConfiguration, + ClickhouseCredentials, +) +from dlt.destinations.impl.clickhouse.sql_client import ClickhouseSqlClient +from dlt.destinations.job_client_impl import ( + SqlJobClientWithStaging, + CopyRemoteFileLoadJob, + SqlJobClientBase, +) +from dlt.destinations.sql_jobs import SqlMergeJob from dlt.destinations.type_mapping import TypeMapper -from dlt.destinations.typing import TNativeConn + + +HINT_TO_CLICKHOUSE_ATTR: Dict[TColumnHint, str] = { + "primary_key": "PRIMARY KEY", +} + +TABLE_ENGINE_TYPE_TO_CLICKHOUSE_ATTR: Dict[TTableEngineType, str] = { + "merge_tree": "MergeTree", + "replicated_merge_tree": "ReplicatedMergeTree", +} class ClickhouseTypeMapper(TypeMapper): @@ -46,6 +68,25 @@ def to_db_time_type(self, precision: Optional[int], table_format: TTableFormat = return "DateTime" +class ClickhouseCopyFileLoadJob(CopyRemoteFileLoadJob): + def __init__( + self, + table: TTableSchema, + file_path: str, + sql_client: ClickhouseSqlClient, + staging_credentials: Optional[ClickhouseCredentials] = None, + staging_iam_role: str = None, + ) -> None: + self._staging_iam_role = staging_iam_role + super().__init__(table, file_path, sql_client, staging_credentials) + + def exception(self) -> str: + pass + + +class ClickhouseMergeJob(SqlMergeJob): ... + + class ClickhouseClient(SqlJobClientWithStaging, SupportsStagingDestination): capabilities: ClassVar[DestinationCapabilitiesContext] = capabilities() @@ -53,15 +94,61 @@ def __init__( self, schema: Schema, config: ClickhouseClientConfiguration, - sql_client: SqlClientBase[TNativeConn], ) -> None: - super().__init__(schema, config, sql_client) - ... + self.config: ClickhouseClientConfiguration = config + # TODO: There are no schemas in Clickhouse. No point in having schemas, only dataset names and table names for example "dataset1_mytable". + self.sql_client = ClickhouseSqlClient( + self.config.normalize_dataset_name(self.schema), self.config.credentials + ) + super().__init__(schema, self.config, self.sql_client) + self.active_hints = deepcopy(HINT_TO_CLICKHOUSE_ATTR) if self.config.create_indexes else {} + self.type_mapper = ClickhouseTypeMapper(self.capabilities) + + def _get_table_update_sql( + self, table_name: str, new_columns: Sequence[TColumnSchema], generate_alter: bool + ) -> List[str]: + table: TTableSchema = self.prepare_load_table(table_name, self.in_staging_mode) + sql = SqlJobClientBase._get_table_update_sql(self, table_name, new_columns, generate_alter) + + if generate_alter: + return sql + + # TODO: Remove `unique` and `primary_key` default implementations. + if primary_key_list := [ + self.capabilities.escape_identifier(c["name"]) + for c in new_columns + if c.get("primary_key") + ]: + sql[0] += "\nPRIMARY KEY (" + ", ".join(primary_key_list) + ")" + else: + sql[0] += "\nPRIMARY KEY tuple()" + + # Default to 'ReplicatedMergeTree' if user didn't explicitly set a table engine hint. + # 'ReplicatedMergeTree' is the only supported engine for Clickhouse Cloud. + sql[0] = f"{sql[0]}\nENGINE = {table.get(TABLE_ENGINE_TYPE_HINT, 'replicated_merge_tree')}" + + return sql def _get_column_def_sql(self, c: TColumnSchema, table_format: TTableFormat = None) -> str: - pass + # The primary key definition is defined outside column specification. + hints_str = " ".join( + self.active_hints.get(hint, "") + for hint in self.active_hints.keys() + if c.get(hint, False) is True and hint != "primary_key" + ) + return ( + f"{self.capabilities.escape_identifier(c['name'])} " + f"{self.type_mapper.to_db_type(c)} " + f"{hints_str} " + f"{self._gen_not_null(c.get('nullable', True))}" + ) + + # Clickhouse fields are not nullable by default. + @staticmethod + def _gen_not_null(v: bool) -> str: + return "NULL" if v else "NOT NULL" def _from_db_type( - self, db_type: str, precision: Optional[int], scale: Optional[int] + self, ch_t: str, precision: Optional[int], scale: Optional[int] ) -> TColumnType: - pass + return self.type_mapper.from_db_type(ch_t, precision, scale) diff --git a/dlt/destinations/impl/clickhouse/clickhouse_adapter.py b/dlt/destinations/impl/clickhouse/clickhouse_adapter.py new file mode 100644 index 0000000000..a2e8d39c03 --- /dev/null +++ b/dlt/destinations/impl/clickhouse/clickhouse_adapter.py @@ -0,0 +1,59 @@ +from typing import Any, Literal, Set, get_args, Dict + +from dlt.destinations.utils import ensure_resource +from dlt.extract import DltResource +from dlt.extract.items import TTableHintTemplate + + +TTableEngineType = Literal["merge_tree", "replicated_merge_tree"] + +""" +The table engine (type of table) determines: + +- How and where data is stored, where to write it to, and where to read it from. +- Which queries are supported, and how. +- Concurrent data access. +- Use of indexes, if present. +- Whether multithread request execution is possible. +- Data replication parameters. + +See https://clickhouse.com/docs/en/engines/table-engines. +""" +TABLE_ENGINE_TYPES: Set[TTableEngineType] = set(get_args(TTableEngineType)) +TABLE_ENGINE_TYPE_HINT: Literal["x-table-engine-type"] = "x-table-engine-type" + +def clickhouse_adapter(data: Any, table_engine_type: TTableEngineType = None) -> DltResource: + """Prepares data for the Clickhouse destination by specifying which table engine type + that should be used. + + Args: + data (Any): The data to be transformed. It can be raw data or an instance + of DltResource. If raw data, the function wraps it into a DltResource + object. + table_engine_type (TTableEngineType, optional): The table index type used when creating + the Synapse table. + + Returns: + DltResource: A resource with applied Synapse-specific hints. + + Raises: + ValueError: If input for `table_engine_type` is invalid. + + Examples: + >>> data = [{"name": "Alice", "description": "Software Developer"}] + >>> clickhouse_adapter(data, table_engine_type="merge_tree") + [DltResource with hints applied] + """ + resource = ensure_resource(data) + + additional_table_hints: Dict[str, TTableHintTemplate[Any]] = {} + if table_engine_type is not None: + if table_engine_type not in TABLE_ENGINE_TYPES: + allowed_types = ", ".join(TABLE_ENGINE_TYPES) + raise ValueError( + f"Table engine type {table_engine_type} is invalid. Allowed table engine types are:" + f" {allowed_types}." + ) + additional_table_hints[TABLE_ENGINE_TYPE_HINT] = table_engine_type + resource.apply_hints(additional_table_hints=additional_table_hints) + return resource diff --git a/dlt/destinations/impl/clickhouse/configuration.py b/dlt/destinations/impl/clickhouse/configuration.py index bedd41bc2e..359f08e47e 100644 --- a/dlt/destinations/impl/clickhouse/configuration.py +++ b/dlt/destinations/impl/clickhouse/configuration.py @@ -64,7 +64,8 @@ class ClickhouseClientConfiguration(DestinationClientDwhWithStagingConfiguration # columns within the same granule. # See: https://clickhouse.com/docs/en/optimize/sparse-primary-indexes create_indexes: bool = False - """Whether `primary_key` and `unique` column hints are applied.""" + """Whether `primary_key` column hint is applied. Note that Clickhouse has no unique constraint, + and primary keys don't guarantee uniqueness.""" __config_gen_annotations__: ClassVar[List[str]] = ["create_indexes"] diff --git a/dlt/destinations/impl/clickhouse/sql_client.py b/dlt/destinations/impl/clickhouse/sql_client.py index 84d3000832..1323dc2da9 100644 --- a/dlt/destinations/impl/clickhouse/sql_client.py +++ b/dlt/destinations/impl/clickhouse/sql_client.py @@ -93,11 +93,13 @@ def execute_sql( @contextmanager @raise_database_error - def execute_query(self, query: AnyStr, *args: Any, **kwargs: Any) -> Iterator[DBApiCursor]: - with self._conn.cursor() as curr: + def execute_query(self, query: AnyStr, *args: Any, **kwargs: Any) -> Iterator[ClickhouseDBApiCursorImpl]: + cur: clickhouse_driver.dbapi.connection.Cursor + with self._conn.cursor() as cur: try: - curr.execute(query, args or (kwargs or None)) - yield ClickhouseDBApiCursorImpl(curr) # type: ignore[abstract] + # TODO: Clickhouse driver only accepts pyformat `...WHERE name=%(name)s` parameter marker arguments. + cur.execute(query, args or (kwargs or None)) + yield ClickhouseDBApiCursorImpl(cur) # type: ignore[abstract] except clickhouse_driver.dbapi.Error: self.close_connection() self.open_connection()