From c9b1ae4648750f49f5be31e96a6c20d47d028b6d Mon Sep 17 00:00:00 2001 From: Marcel Coetzee Date: Tue, 19 Mar 2024 23:58:17 +0200 Subject: [PATCH] Pass basic tests #1055 Signed-off-by: Marcel Coetzee --- dlt/common/data_writers/escape.py | 2 +- dlt/destinations/impl/clickhouse/__init__.py | 4 +- .../impl/clickhouse/clickhouse.py | 53 ++++++--- .../impl/clickhouse/configuration.py | 6 - dlt/destinations/impl/clickhouse/factory.py | 4 - .../impl/clickhouse/sql_client.py | 17 ++- .../test_clickhouse_table_builder.py | 111 ++++++++++++------ 7 files changed, 128 insertions(+), 69 deletions(-) diff --git a/dlt/common/data_writers/escape.py b/dlt/common/data_writers/escape.py index 027e7b1554..153e66722c 100644 --- a/dlt/common/data_writers/escape.py +++ b/dlt/common/data_writers/escape.py @@ -174,4 +174,4 @@ def escape_clickhouse_literal(v: Any) -> Any: def escape_clickhouse_identifier(v: str) -> str: - return "`" + v.replace("`", "``").replace("\\", "\\\\") + '"' + return "`" + v.replace("`", "``").replace("\\", "\\\\") + "`" diff --git a/dlt/destinations/impl/clickhouse/__init__.py b/dlt/destinations/impl/clickhouse/__init__.py index acbb08ac9a..9a3560223c 100644 --- a/dlt/destinations/impl/clickhouse/__init__.py +++ b/dlt/destinations/impl/clickhouse/__init__.py @@ -7,9 +7,9 @@ def capabilities() -> DestinationCapabilitiesContext: caps = DestinationCapabilitiesContext() caps.preferred_loader_file_format = "jsonl" - caps.supported_loader_file_formats = ["jsonl"] + caps.supported_loader_file_formats = ["jsonl", "parquet"] caps.preferred_staging_file_format = "jsonl" - caps.supported_staging_file_formats = ["jsonl"] + caps.supported_staging_file_formats = ["jsonl", "parquet"] caps.escape_identifier = escape_clickhouse_identifier caps.escape_literal = escape_clickhouse_literal diff --git a/dlt/destinations/impl/clickhouse/clickhouse.py b/dlt/destinations/impl/clickhouse/clickhouse.py index 3b901d38a2..097224cb18 100644 --- a/dlt/destinations/impl/clickhouse/clickhouse.py +++ b/dlt/destinations/impl/clickhouse/clickhouse.py @@ -1,6 +1,7 @@ +import logging import os from copy import deepcopy -from typing import ClassVar, Optional, Dict, List, Sequence +from typing import ClassVar, Optional, Dict, List, Sequence, cast from urllib.parse import urlparse from dlt.common.configuration.specs import ( @@ -41,6 +42,8 @@ HINT_TO_CLICKHOUSE_ATTR: Dict[TColumnHint, str] = { "primary_key": "PRIMARY KEY", + "unique": "", # No unique constraints available in Clickhouse. + "foreign_key": "", # No foreign key constraints support in Clickhouse. } TABLE_ENGINE_TYPE_TO_CLICKHOUSE_ATTR: Dict[TTableEngineType, str] = { @@ -51,12 +54,13 @@ class ClickhouseTypeMapper(TypeMapper): sct_to_unbound_dbt = { - "complex": "JSON", + "complex": "String", "text": "String", "double": "Float64", "bool": "Boolean", "date": "Date", - "timestamp": "DateTime", + "timestamp": "DateTime('UTC')", + "time": "Time('UTC')", "bigint": "Int64", "binary": "String", "wei": "Decimal", @@ -65,7 +69,8 @@ class ClickhouseTypeMapper(TypeMapper): sct_to_dbt = { "decimal": "Decimal(%i,%i)", "wei": "Decimal(%i,%i)", - "timestamp": "DateTime(%i)", + "timestamp": "DateTime(%i, 'UTC')", + "time": "Time(%i ,'UTC')", } dbt_to_sct = { @@ -74,6 +79,9 @@ class ClickhouseTypeMapper(TypeMapper): "Boolean": "bool", "Date": "date", "DateTime": "timestamp", + "DateTime('UTC')": "timestamp", + "Time": "timestamp", + "Time('UTC')": "timestamp", "Int64": "bigint", "JSON": "complex", "Decimal": "decimal", @@ -190,7 +198,7 @@ def __init__( ) super().__init__(schema, config, self.sql_client) self.config: ClickhouseClientConfiguration = config - self.active_hints = deepcopy(HINT_TO_CLICKHOUSE_ATTR) if self.config.create_indexes else {} + self.active_hints = deepcopy(HINT_TO_CLICKHOUSE_ATTR) self.type_mapper = ClickhouseTypeMapper(self.capabilities) def start_file_load(self, table: TTableSchema, file_path: str, load_id: str) -> LoadJob: @@ -213,10 +221,11 @@ def _get_table_update_sql( return sql # Default to 'ReplicatedMergeTree' if user didn't explicitly set a table engine hint. - # 'ReplicatedMergeTree' is the only supported engine for Clickhouse Cloud. - sql[0] = f"{sql[0]}\nENGINE = {table.get(TABLE_ENGINE_TYPE_HINT, 'replicated_merge_tree')}" + table_type = cast( + TTableEngineType, table.get(TABLE_ENGINE_TYPE_HINT, "replicated_merge_tree") + ) + sql[0] = f"{sql[0]}\nENGINE = {TABLE_ENGINE_TYPE_TO_CLICKHOUSE_ATTR.get(table_type)}" - # TODO: Remove `unique` and `primary_key` default implementations. if primary_key_list := [ self.capabilities.escape_identifier(c["name"]) for c in new_columns @@ -226,26 +235,38 @@ def _get_table_update_sql( else: sql[0] += "\nPRIMARY KEY tuple()" + # TODO: Apply sort order and cluster key hints. + return sql def _get_column_def_sql(self, c: TColumnSchema, table_format: TTableFormat = None) -> str: - # The primary key definition is defined outside column specification. + # Build column definition. + # The primary key and sort order definition is defined outside column specification. hints_str = " ".join( - self.active_hints.get(hint, "") + self.active_hints.get(hint) for hint in self.active_hints.keys() - if c.get(hint, False) is True and hint != "primary_key" + if c.get(hint, False) is True + and hint not in ("primary_key", "sort") + and hint in self.active_hints ) + + # Alter table statements only accept `Nullable` modifiers. + type_with_nullability_modifier = ( + f"Nullable({self.type_mapper.to_db_type(c)})" + if c.get("nullable", True) + else self.type_mapper.to_db_type(c) + ) + return ( - f"{self.capabilities.escape_identifier(c['name'])} " - f"{self.type_mapper.to_db_type(c)} " - f"{hints_str} " - f"{self._gen_not_null(c.get('nullable', True))}" + f"{self.capabilities.escape_identifier(c['name'])} {type_with_nullability_modifier} {hints_str}" + .strip() ) # Clickhouse fields are not nullable by default. @staticmethod def _gen_not_null(v: bool) -> str: - return "NULL" if v else "NOT NULL" + # We use the `Nullable` modifier instead of NULL / NOT NULL modifiers to cater for ALTER statement. + pass def _from_db_type( self, ch_t: str, precision: Optional[int], scale: Optional[int] diff --git a/dlt/destinations/impl/clickhouse/configuration.py b/dlt/destinations/impl/clickhouse/configuration.py index 35bf130d42..fd5b3276c0 100644 --- a/dlt/destinations/impl/clickhouse/configuration.py +++ b/dlt/destinations/impl/clickhouse/configuration.py @@ -63,11 +63,6 @@ class ClickhouseClientConfiguration(DestinationClientDwhWithStagingConfiguration # but they do not enforce uniqueness constraints. It permits duplicate values even for the primary key # columns within the same granule. # See: https://clickhouse.com/docs/en/optimize/sparse-primary-indexes - create_indexes: bool = True - """Whether `primary_key` column hint is applied. Note that Clickhouse has no unique constraint, - and primary keys don't guarantee uniqueness.""" - - __config_gen_annotations__: ClassVar[List[str]] = ["create_indexes"] def fingerprint(self) -> str: """Returns a fingerprint of host part of a connection string.""" @@ -82,7 +77,6 @@ def __init__( *, credentials: ClickhouseCredentials = None, dataset_name: str = None, - create_indexes: bool = True, destination_name: str = None, environment: str = None ) -> None: diff --git a/dlt/destinations/impl/clickhouse/factory.py b/dlt/destinations/impl/clickhouse/factory.py index 90065c6582..2242d30565 100644 --- a/dlt/destinations/impl/clickhouse/factory.py +++ b/dlt/destinations/impl/clickhouse/factory.py @@ -29,7 +29,6 @@ def __init__( credentials: t.Union[ClickhouseCredentials, str, t.Dict[str, t.Any], Connection] = None, destination_name: str = None, environment: str = None, - create_indexes: bool = False, **kwargs: t.Any, ) -> None: """Configure the Clickhouse destination to use in a pipeline. @@ -41,14 +40,11 @@ def __init__( credentials: Credentials to connect to the clickhouse database. Can be an instance of `ClickhouseCredentials`, or a connection string in the format `clickhouse://user:password@host:port/database`. - create_indexes: Maps directly to the `create_indexes` attribute of the - `ClickhouseClientConfiguration` object. **kwargs: Additional arguments passed to the destination config. """ super().__init__( credentials=credentials, destination_name=destination_name, environment=environment, - create_indexes=create_indexes, **kwargs, ) diff --git a/dlt/destinations/impl/clickhouse/sql_client.py b/dlt/destinations/impl/clickhouse/sql_client.py index 87b5651668..1705c7c34e 100644 --- a/dlt/destinations/impl/clickhouse/sql_client.py +++ b/dlt/destinations/impl/clickhouse/sql_client.py @@ -29,7 +29,7 @@ raise_database_error, raise_open_connection_error, ) -from dlt.destinations.typing import DBTransaction, DBApi, DBApiCursor +from dlt.destinations.typing import DBTransaction, DBApi TRANSACTIONS_UNSUPPORTED_WARNING_MESSAGE = ( @@ -57,6 +57,8 @@ def open_connection(self) -> clickhouse_driver.dbapi.connection.Connection: self._conn = clickhouse_driver.connect(dsn=self.credentials.to_native_representation()) # TODO: Set timezone to UTC explicitly in each query. # https://github.com/ClickHouse/ClickHouse/issues/699 + with self._conn.cursor() as curr: + curr.execute("set allow_experimental_object_type = 1;") return self._conn @raise_open_connection_error @@ -118,6 +120,19 @@ def fully_qualified_dataset_name(self, escape: bool = True) -> str: ) return f"{database_name}.{dataset_name}" + def make_qualified_table_name(self, table_name: str, escape: bool = True) -> str: + database_name = ( + self.capabilities.escape_identifier(self.database_name) + if escape + else self.database_name + ) + dataset_table_name = ( + self.capabilities.escape_identifier(f"{self.dataset_name}_{table_name}") + if escape + else f"{self.dataset_name}_{table_name}" + ) + return f"{database_name}.{dataset_table_name}" + @classmethod def _make_database_exception(cls, ex: Exception) -> Exception: # type: ignore[return] if isinstance(ex, clickhouse_driver.dbapi.errors.OperationalError): diff --git a/tests/load/clickhouse/test_clickhouse_table_builder.py b/tests/load/clickhouse/test_clickhouse_table_builder.py index 72902c77c2..bf19759e39 100644 --- a/tests/load/clickhouse/test_clickhouse_table_builder.py +++ b/tests/load/clickhouse/test_clickhouse_table_builder.py @@ -27,21 +27,40 @@ def test_create_table(clickhouse_client: ClickhouseClient) -> None: statements = clickhouse_client._get_table_update_sql("event_test_table", TABLE_UPDATE, False) assert len(statements) == 1 sql = statements[0] - print(sql) - sqlfluff.parse(sql, dialect="clickhouse") + + # sqlfluff struggles with clickhouse's backtick escape characters. + # sqlfluff.parse(sql, dialect="clickhouse") assert sql.strip().startswith("CREATE TABLE") - assert "EVENT_TEST_TABLE" in sql - assert '"COL1" NUMBER(19,0) NOT NULL' in sql - assert '"COL2" FLOAT NOT NULL' in sql - assert '"COL3" BOOLEAN NOT NULL' in sql - assert '"COL4" TIMESTAMP_TZ NOT NULL' in sql - assert '"COL5" VARCHAR' in sql - assert '"COL6" NUMBER(38,9) NOT NULL' in sql - assert '"COL7" BINARY' in sql - assert '"COL8" NUMBER(38,0)' in sql - assert '"COL9" VARIANT NOT NULL' in sql - assert '"COL10" DATE NOT NULL' in sql + assert "event_test_table" in sql + assert "`col1` Int64" in sql + assert "`col2` Float64" in sql + assert "`col3` Boolean" in sql + assert "`col4` DateTime('UTC')" in sql + assert "`col5` String" in sql + assert "`col6` Decimal(38,9)" in sql + assert "`col7` String" in sql + assert "`col8` Decimal(76,0)" in sql + assert "`col9` String" in sql + assert "`col10` Date" in sql + assert "`col11` DateTime" in sql + assert "`col1_null` Nullable(Int64)" in sql + assert "`col2_null` Nullable(Float64)" in sql + assert "`col3_null` Nullable(Boolean)" in sql + assert "`col4_null` Nullable(DateTime('UTC'))" in sql + assert "`col5_null` Nullable(String)" in sql + assert "`col6_null` Nullable(Decimal(38,9))" in sql + assert "`col7_null` Nullable(String)" in sql + assert "`col8_null` Nullable(Decimal(76,0))" in sql + assert "`col9_null` Nullable(String)" in sql + assert "`col10_null` Nullable(Date)" in sql + assert "`col11_null` Nullable(DateTime)" in sql + assert "`col1_precision` Int64" in sql + assert "`col4_precision` DateTime(3, 'UTC')" in sql + assert "`col5_precision` String" in sql + assert "`col6_precision` Decimal(6,2)" in sql + assert "`col7_precision` String" in sql + assert "`col11_precision` DateTime" in sql def test_alter_table(clickhouse_client: ClickhouseClient) -> None: @@ -49,44 +68,58 @@ def test_alter_table(clickhouse_client: ClickhouseClient) -> None: assert len(statements) == 1 sql = statements[0] - # TODO: sqlfluff doesn't parse clickhouse multi ADD COLUMN clause correctly - # sqlfluff.parse(sql, dialect='clickhouse') + # sqlfluff struggles with clickhouse's backtick escape characters. + # sqlfluff.parse(sql, dialect="clickhouse") + # Alter table statements only accept `Nullable` modifiers. assert sql.startswith("ALTER TABLE") assert sql.count("ALTER TABLE") == 1 - assert sql.count("ADD COLUMN") == 1 - assert '"EVENT_TEST_TABLE"' in sql - assert '"COL1" NUMBER(19,0) NOT NULL' in sql - assert '"COL2" FLOAT NOT NULL' in sql - assert '"COL3" BOOLEAN NOT NULL' in sql - assert '"COL4" TIMESTAMP_TZ NOT NULL' in sql - assert '"COL5" VARCHAR' in sql - assert '"COL6" NUMBER(38,9) NOT NULL' in sql - assert '"COL7" BINARY' in sql - assert '"COL8" NUMBER(38,0)' in sql - assert '"COL9" VARIANT NOT NULL' in sql - assert '"COL10" DATE' in sql + assert "event_test_table" in sql + assert "`col1` Int64" in sql + assert "`col2` Float64" in sql + assert "`col3` Boolean" in sql + assert "`col4` DateTime('UTC')" in sql + assert "`col5` String" in sql + assert "`col6` Decimal(38,9)" in sql + assert "`col7` String" in sql + assert "`col8` Decimal(76,0)" in sql + assert "`col9` String" in sql + assert "`col10` Date" in sql + assert "`col11` DateTime" in sql + assert "`col1_null` Nullable(Int64)" in sql + assert "`col2_null` Nullable(Float64)" in sql + assert "`col3_null` Nullable(Boolean)" in sql + assert "`col4_null` Nullable(DateTime('UTC'))" in sql + assert "`col5_null` Nullable(String)" in sql + assert "`col6_null` Nullable(Decimal(38,9))" in sql + assert "`col7_null` Nullable(String)" in sql + assert "`col8_null` Nullable(Decimal(76,0))" in sql + assert "`col9_null` Nullable(String)" in sql + assert "`col10_null` Nullable(Date)" in sql + assert "`col11_null` Nullable(DateTime)" in sql + assert "`col1_precision` Int64" in sql + assert "`col4_precision` DateTime(3, 'UTC')" in sql + assert "`col5_precision` String" in sql + assert "`col6_precision` Decimal(6,2)" in sql + assert "`col7_precision` String" in sql + assert "`col11_precision` DateTime" in sql mod_table = deepcopy(TABLE_UPDATE) mod_table.pop(0) sql = clickhouse_client._get_table_update_sql("event_test_table", mod_table, True)[0] - assert '"COL1"' not in sql - assert '"COL2" FLOAT NOT NULL' in sql + assert "`col1`" not in sql + assert "`col2` Float64" in sql -def test_create_table_with_partition_and_cluster(clickhouse_client: ClickhouseClient) -> None: +@pytest.mark.usefixtures("empty_schema") +def test_create_table_with_primary_keys(clickhouse_client: ClickhouseClient) -> None: mod_update = deepcopy(TABLE_UPDATE) - # timestamp - mod_update[3]["partition"] = True - mod_update[4]["cluster"] = True - mod_update[1]["cluster"] = True + + mod_update[1]["primary_key"] = True + mod_update[4]["primary_key"] = True statements = clickhouse_client._get_table_update_sql("event_test_table", mod_update, False) assert len(statements) == 1 sql = statements[0] - # TODO: Can't parse cluster by - # sqlfluff.parse(sql, dialect="clickhouse") - - # clustering must be the last - assert sql.endswith('CLUSTER BY ("COL2","COL5")') + assert sql.endswith("PRIMARY KEY (`col2`, `col5`)")