Skip to content

Commit

Permalink
Pass basic tests #1055
Browse files Browse the repository at this point in the history
Signed-off-by: Marcel Coetzee <[email protected]>
  • Loading branch information
Pipboyguy committed Mar 19, 2024
1 parent 29b5a07 commit c9b1ae4
Show file tree
Hide file tree
Showing 7 changed files with 128 additions and 69 deletions.
2 changes: 1 addition & 1 deletion dlt/common/data_writers/escape.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,4 +174,4 @@ def escape_clickhouse_literal(v: Any) -> Any:


def escape_clickhouse_identifier(v: str) -> str:
return "`" + v.replace("`", "``").replace("\\", "\\\\") + '"'
return "`" + v.replace("`", "``").replace("\\", "\\\\") + "`"
4 changes: 2 additions & 2 deletions dlt/destinations/impl/clickhouse/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ def capabilities() -> DestinationCapabilitiesContext:
caps = DestinationCapabilitiesContext()

caps.preferred_loader_file_format = "jsonl"
caps.supported_loader_file_formats = ["jsonl"]
caps.supported_loader_file_formats = ["jsonl", "parquet"]
caps.preferred_staging_file_format = "jsonl"
caps.supported_staging_file_formats = ["jsonl"]
caps.supported_staging_file_formats = ["jsonl", "parquet"]

caps.escape_identifier = escape_clickhouse_identifier
caps.escape_literal = escape_clickhouse_literal
Expand Down
53 changes: 37 additions & 16 deletions dlt/destinations/impl/clickhouse/clickhouse.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging
import os
from copy import deepcopy
from typing import ClassVar, Optional, Dict, List, Sequence
from typing import ClassVar, Optional, Dict, List, Sequence, cast
from urllib.parse import urlparse

from dlt.common.configuration.specs import (
Expand Down Expand Up @@ -41,6 +42,8 @@

HINT_TO_CLICKHOUSE_ATTR: Dict[TColumnHint, str] = {
"primary_key": "PRIMARY KEY",
"unique": "", # No unique constraints available in Clickhouse.
"foreign_key": "", # No foreign key constraints support in Clickhouse.
}

TABLE_ENGINE_TYPE_TO_CLICKHOUSE_ATTR: Dict[TTableEngineType, str] = {
Expand All @@ -51,12 +54,13 @@

class ClickhouseTypeMapper(TypeMapper):
sct_to_unbound_dbt = {
"complex": "JSON",
"complex": "String",
"text": "String",
"double": "Float64",
"bool": "Boolean",
"date": "Date",
"timestamp": "DateTime",
"timestamp": "DateTime('UTC')",
"time": "Time('UTC')",
"bigint": "Int64",
"binary": "String",
"wei": "Decimal",
Expand All @@ -65,7 +69,8 @@ class ClickhouseTypeMapper(TypeMapper):
sct_to_dbt = {
"decimal": "Decimal(%i,%i)",
"wei": "Decimal(%i,%i)",
"timestamp": "DateTime(%i)",
"timestamp": "DateTime(%i, 'UTC')",
"time": "Time(%i ,'UTC')",
}

dbt_to_sct = {
Expand All @@ -74,6 +79,9 @@ class ClickhouseTypeMapper(TypeMapper):
"Boolean": "bool",
"Date": "date",
"DateTime": "timestamp",
"DateTime('UTC')": "timestamp",
"Time": "timestamp",
"Time('UTC')": "timestamp",
"Int64": "bigint",
"JSON": "complex",
"Decimal": "decimal",
Expand Down Expand Up @@ -190,7 +198,7 @@ def __init__(
)
super().__init__(schema, config, self.sql_client)
self.config: ClickhouseClientConfiguration = config
self.active_hints = deepcopy(HINT_TO_CLICKHOUSE_ATTR) if self.config.create_indexes else {}
self.active_hints = deepcopy(HINT_TO_CLICKHOUSE_ATTR)
self.type_mapper = ClickhouseTypeMapper(self.capabilities)

def start_file_load(self, table: TTableSchema, file_path: str, load_id: str) -> LoadJob:
Expand All @@ -213,10 +221,11 @@ def _get_table_update_sql(
return sql

# Default to 'ReplicatedMergeTree' if user didn't explicitly set a table engine hint.
# 'ReplicatedMergeTree' is the only supported engine for Clickhouse Cloud.
sql[0] = f"{sql[0]}\nENGINE = {table.get(TABLE_ENGINE_TYPE_HINT, 'replicated_merge_tree')}"
table_type = cast(
TTableEngineType, table.get(TABLE_ENGINE_TYPE_HINT, "replicated_merge_tree")
)
sql[0] = f"{sql[0]}\nENGINE = {TABLE_ENGINE_TYPE_TO_CLICKHOUSE_ATTR.get(table_type)}"

# TODO: Remove `unique` and `primary_key` default implementations.
if primary_key_list := [
self.capabilities.escape_identifier(c["name"])
for c in new_columns
Expand All @@ -226,26 +235,38 @@ def _get_table_update_sql(
else:
sql[0] += "\nPRIMARY KEY tuple()"

# TODO: Apply sort order and cluster key hints.

return sql

def _get_column_def_sql(self, c: TColumnSchema, table_format: TTableFormat = None) -> str:
# The primary key definition is defined outside column specification.
# Build column definition.
# The primary key and sort order definition is defined outside column specification.
hints_str = " ".join(
self.active_hints.get(hint, "")
self.active_hints.get(hint)
for hint in self.active_hints.keys()
if c.get(hint, False) is True and hint != "primary_key"
if c.get(hint, False) is True
and hint not in ("primary_key", "sort")
and hint in self.active_hints
)

# Alter table statements only accept `Nullable` modifiers.
type_with_nullability_modifier = (
f"Nullable({self.type_mapper.to_db_type(c)})"
if c.get("nullable", True)
else self.type_mapper.to_db_type(c)
)

return (
f"{self.capabilities.escape_identifier(c['name'])} "
f"{self.type_mapper.to_db_type(c)} "
f"{hints_str} "
f"{self._gen_not_null(c.get('nullable', True))}"
f"{self.capabilities.escape_identifier(c['name'])} {type_with_nullability_modifier} {hints_str}"
.strip()
)

# Clickhouse fields are not nullable by default.
@staticmethod
def _gen_not_null(v: bool) -> str:
return "NULL" if v else "NOT NULL"
# We use the `Nullable` modifier instead of NULL / NOT NULL modifiers to cater for ALTER statement.
pass

def _from_db_type(
self, ch_t: str, precision: Optional[int], scale: Optional[int]
Expand Down
6 changes: 0 additions & 6 deletions dlt/destinations/impl/clickhouse/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,6 @@ class ClickhouseClientConfiguration(DestinationClientDwhWithStagingConfiguration
# but they do not enforce uniqueness constraints. It permits duplicate values even for the primary key
# columns within the same granule.
# See: https://clickhouse.com/docs/en/optimize/sparse-primary-indexes
create_indexes: bool = True
"""Whether `primary_key` column hint is applied. Note that Clickhouse has no unique constraint,
and primary keys don't guarantee uniqueness."""

__config_gen_annotations__: ClassVar[List[str]] = ["create_indexes"]

def fingerprint(self) -> str:
"""Returns a fingerprint of host part of a connection string."""
Expand All @@ -82,7 +77,6 @@ def __init__(
*,
credentials: ClickhouseCredentials = None,
dataset_name: str = None,
create_indexes: bool = True,
destination_name: str = None,
environment: str = None
) -> None:
Expand Down
4 changes: 0 additions & 4 deletions dlt/destinations/impl/clickhouse/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ def __init__(
credentials: t.Union[ClickhouseCredentials, str, t.Dict[str, t.Any], Connection] = None,
destination_name: str = None,
environment: str = None,
create_indexes: bool = False,
**kwargs: t.Any,
) -> None:
"""Configure the Clickhouse destination to use in a pipeline.
Expand All @@ -41,14 +40,11 @@ def __init__(
credentials: Credentials to connect to the clickhouse database.
Can be an instance of `ClickhouseCredentials`, or a connection string
in the format `clickhouse://user:password@host:port/database`.
create_indexes: Maps directly to the `create_indexes` attribute of the
`ClickhouseClientConfiguration` object.
**kwargs: Additional arguments passed to the destination config.
"""
super().__init__(
credentials=credentials,
destination_name=destination_name,
environment=environment,
create_indexes=create_indexes,
**kwargs,
)
17 changes: 16 additions & 1 deletion dlt/destinations/impl/clickhouse/sql_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
raise_database_error,
raise_open_connection_error,
)
from dlt.destinations.typing import DBTransaction, DBApi, DBApiCursor
from dlt.destinations.typing import DBTransaction, DBApi


TRANSACTIONS_UNSUPPORTED_WARNING_MESSAGE = (
Expand Down Expand Up @@ -57,6 +57,8 @@ def open_connection(self) -> clickhouse_driver.dbapi.connection.Connection:
self._conn = clickhouse_driver.connect(dsn=self.credentials.to_native_representation())
# TODO: Set timezone to UTC explicitly in each query.
# https://github.com/ClickHouse/ClickHouse/issues/699
with self._conn.cursor() as curr:
curr.execute("set allow_experimental_object_type = 1;")
return self._conn

@raise_open_connection_error
Expand Down Expand Up @@ -118,6 +120,19 @@ def fully_qualified_dataset_name(self, escape: bool = True) -> str:
)
return f"{database_name}.{dataset_name}"

def make_qualified_table_name(self, table_name: str, escape: bool = True) -> str:
database_name = (
self.capabilities.escape_identifier(self.database_name)
if escape
else self.database_name
)
dataset_table_name = (
self.capabilities.escape_identifier(f"{self.dataset_name}_{table_name}")
if escape
else f"{self.dataset_name}_{table_name}"
)
return f"{database_name}.{dataset_table_name}"

@classmethod
def _make_database_exception(cls, ex: Exception) -> Exception: # type: ignore[return]
if isinstance(ex, clickhouse_driver.dbapi.errors.OperationalError):
Expand Down
111 changes: 72 additions & 39 deletions tests/load/clickhouse/test_clickhouse_table_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,66 +27,99 @@ def test_create_table(clickhouse_client: ClickhouseClient) -> None:
statements = clickhouse_client._get_table_update_sql("event_test_table", TABLE_UPDATE, False)
assert len(statements) == 1
sql = statements[0]
print(sql)
sqlfluff.parse(sql, dialect="clickhouse")

# sqlfluff struggles with clickhouse's backtick escape characters.
# sqlfluff.parse(sql, dialect="clickhouse")

assert sql.strip().startswith("CREATE TABLE")
assert "EVENT_TEST_TABLE" in sql
assert '"COL1" NUMBER(19,0) NOT NULL' in sql
assert '"COL2" FLOAT NOT NULL' in sql
assert '"COL3" BOOLEAN NOT NULL' in sql
assert '"COL4" TIMESTAMP_TZ NOT NULL' in sql
assert '"COL5" VARCHAR' in sql
assert '"COL6" NUMBER(38,9) NOT NULL' in sql
assert '"COL7" BINARY' in sql
assert '"COL8" NUMBER(38,0)' in sql
assert '"COL9" VARIANT NOT NULL' in sql
assert '"COL10" DATE NOT NULL' in sql
assert "event_test_table" in sql
assert "`col1` Int64" in sql
assert "`col2` Float64" in sql
assert "`col3` Boolean" in sql
assert "`col4` DateTime('UTC')" in sql
assert "`col5` String" in sql
assert "`col6` Decimal(38,9)" in sql
assert "`col7` String" in sql
assert "`col8` Decimal(76,0)" in sql
assert "`col9` String" in sql
assert "`col10` Date" in sql
assert "`col11` DateTime" in sql
assert "`col1_null` Nullable(Int64)" in sql
assert "`col2_null` Nullable(Float64)" in sql
assert "`col3_null` Nullable(Boolean)" in sql
assert "`col4_null` Nullable(DateTime('UTC'))" in sql
assert "`col5_null` Nullable(String)" in sql
assert "`col6_null` Nullable(Decimal(38,9))" in sql
assert "`col7_null` Nullable(String)" in sql
assert "`col8_null` Nullable(Decimal(76,0))" in sql
assert "`col9_null` Nullable(String)" in sql
assert "`col10_null` Nullable(Date)" in sql
assert "`col11_null` Nullable(DateTime)" in sql
assert "`col1_precision` Int64" in sql
assert "`col4_precision` DateTime(3, 'UTC')" in sql
assert "`col5_precision` String" in sql
assert "`col6_precision` Decimal(6,2)" in sql
assert "`col7_precision` String" in sql
assert "`col11_precision` DateTime" in sql


def test_alter_table(clickhouse_client: ClickhouseClient) -> None:
statements = clickhouse_client._get_table_update_sql("event_test_table", TABLE_UPDATE, True)
assert len(statements) == 1
sql = statements[0]

# TODO: sqlfluff doesn't parse clickhouse multi ADD COLUMN clause correctly
# sqlfluff.parse(sql, dialect='clickhouse')
# sqlfluff struggles with clickhouse's backtick escape characters.
# sqlfluff.parse(sql, dialect="clickhouse")

# Alter table statements only accept `Nullable` modifiers.
assert sql.startswith("ALTER TABLE")
assert sql.count("ALTER TABLE") == 1
assert sql.count("ADD COLUMN") == 1
assert '"EVENT_TEST_TABLE"' in sql
assert '"COL1" NUMBER(19,0) NOT NULL' in sql
assert '"COL2" FLOAT NOT NULL' in sql
assert '"COL3" BOOLEAN NOT NULL' in sql
assert '"COL4" TIMESTAMP_TZ NOT NULL' in sql
assert '"COL5" VARCHAR' in sql
assert '"COL6" NUMBER(38,9) NOT NULL' in sql
assert '"COL7" BINARY' in sql
assert '"COL8" NUMBER(38,0)' in sql
assert '"COL9" VARIANT NOT NULL' in sql
assert '"COL10" DATE' in sql
assert "event_test_table" in sql
assert "`col1` Int64" in sql
assert "`col2` Float64" in sql
assert "`col3` Boolean" in sql
assert "`col4` DateTime('UTC')" in sql
assert "`col5` String" in sql
assert "`col6` Decimal(38,9)" in sql
assert "`col7` String" in sql
assert "`col8` Decimal(76,0)" in sql
assert "`col9` String" in sql
assert "`col10` Date" in sql
assert "`col11` DateTime" in sql
assert "`col1_null` Nullable(Int64)" in sql
assert "`col2_null` Nullable(Float64)" in sql
assert "`col3_null` Nullable(Boolean)" in sql
assert "`col4_null` Nullable(DateTime('UTC'))" in sql
assert "`col5_null` Nullable(String)" in sql
assert "`col6_null` Nullable(Decimal(38,9))" in sql
assert "`col7_null` Nullable(String)" in sql
assert "`col8_null` Nullable(Decimal(76,0))" in sql
assert "`col9_null` Nullable(String)" in sql
assert "`col10_null` Nullable(Date)" in sql
assert "`col11_null` Nullable(DateTime)" in sql
assert "`col1_precision` Int64" in sql
assert "`col4_precision` DateTime(3, 'UTC')" in sql
assert "`col5_precision` String" in sql
assert "`col6_precision` Decimal(6,2)" in sql
assert "`col7_precision` String" in sql
assert "`col11_precision` DateTime" in sql

mod_table = deepcopy(TABLE_UPDATE)
mod_table.pop(0)
sql = clickhouse_client._get_table_update_sql("event_test_table", mod_table, True)[0]

assert '"COL1"' not in sql
assert '"COL2" FLOAT NOT NULL' in sql
assert "`col1`" not in sql
assert "`col2` Float64" in sql


def test_create_table_with_partition_and_cluster(clickhouse_client: ClickhouseClient) -> None:
@pytest.mark.usefixtures("empty_schema")
def test_create_table_with_primary_keys(clickhouse_client: ClickhouseClient) -> None:
mod_update = deepcopy(TABLE_UPDATE)
# timestamp
mod_update[3]["partition"] = True
mod_update[4]["cluster"] = True
mod_update[1]["cluster"] = True

mod_update[1]["primary_key"] = True
mod_update[4]["primary_key"] = True
statements = clickhouse_client._get_table_update_sql("event_test_table", mod_update, False)
assert len(statements) == 1
sql = statements[0]

# TODO: Can't parse cluster by
# sqlfluff.parse(sql, dialect="clickhouse")

# clustering must be the last
assert sql.endswith('CLUSTER BY ("COL2","COL5")')
assert sql.endswith("PRIMARY KEY (`col2`, `col5`)")

0 comments on commit c9b1ae4

Please sign in to comment.