From 463ca1dfee849549956b8dcd3d0ba3b782289e59 Mon Sep 17 00:00:00 2001 From: Marcin Rudolf Date: Wed, 10 Apr 2024 23:57:01 +0200 Subject: [PATCH 1/4] auto compression for parquet, detects compression of local files --- dlt/destinations/impl/clickhouse/clickhouse.py | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/dlt/destinations/impl/clickhouse/clickhouse.py b/dlt/destinations/impl/clickhouse/clickhouse.py index c4e7b9c02e..259c0b12d2 100644 --- a/dlt/destinations/impl/clickhouse/clickhouse.py +++ b/dlt/destinations/impl/clickhouse/clickhouse.py @@ -174,7 +174,12 @@ def __init__( file_extension = cast(SUPPORTED_FILE_FORMATS, file_extension) clickhouse_format: str = FILE_FORMAT_TO_TABLE_FUNCTION_MAPPING[file_extension] - compression = "none" if config.get("data_writer.disable_compression") else "gz" + if file_extension == "parquet": + # Auto works for parquet + compression = "auto" + else: + # It does not work for json + compression = "none" if config.get("data_writer.disable_compression") else "gz" statement: str = "" @@ -193,7 +198,6 @@ def __init__( access_key_id = None secret_access_key = None - clickhouse_format = FILE_FORMAT_TO_TABLE_FUNCTION_MAPPING[file_extension] structure = "auto" template = Template(""" @@ -234,6 +238,10 @@ def __init__( statement = f"INSERT INTO {qualified_table_name} {table_function}" elif not bucket_path: # Local filesystem. + if file_extension == "parquet": + compression = "auto" + else: + compression = "gz" if FileStorage.is_gzipped(file_path) else "none" try: with clickhouse_connect.create_client( host=client.credentials.host, @@ -252,7 +260,7 @@ def __init__( "allow_experimental_lightweight_delete": 1, "allow_experimental_object_type": 1, }, - compression=None if compression == "none" else compression, + compression=compression, ) except clickhouse_connect.driver.exceptions.Error as e: raise LoadJobTerminalException( From e7e5925be85f74b48f6ad31a2e4b095a560708d7 Mon Sep 17 00:00:00 2001 From: Marcin Rudolf Date: Wed, 10 Apr 2024 23:57:59 +0200 Subject: [PATCH 2/4] fixes has_dataset, recognizes more exceptions --- .../impl/clickhouse/sql_client.py | 36 +++++++++++-------- 1 file changed, 21 insertions(+), 15 deletions(-) diff --git a/dlt/destinations/impl/clickhouse/sql_client.py b/dlt/destinations/impl/clickhouse/sql_client.py index fd3c9a401b..5710e41e49 100644 --- a/dlt/destinations/impl/clickhouse/sql_client.py +++ b/dlt/destinations/impl/clickhouse/sql_client.py @@ -3,6 +3,7 @@ Iterator, AnyStr, Any, + List, Optional, Sequence, ClassVar, @@ -44,7 +45,7 @@ class ClickHouseDBApiCursorImpl(DBApiCursorImpl): class ClickHouseSqlClient( SqlClientBase[clickhouse_driver.dbapi.connection.Connection], DBTransaction ): - dbapi: ClassVar[DBApi] = clickhouse_driver.dbapi.connection.Connection + dbapi: ClassVar[DBApi] = clickhouse_driver.dbapi capabilities: ClassVar[DestinationCapabilitiesContext] = capabilities() def __init__(self, dataset_name: str, credentials: ClickHouseCredentials) -> None: @@ -54,7 +55,7 @@ def __init__(self, dataset_name: str, credentials: ClickHouseCredentials) -> Non self.database_name = credentials.database def has_dataset(self) -> bool: - return super().has_dataset() + return len(self._list_tables()) > 0 def open_connection(self) -> clickhouse_driver.dbapi.connection.Connection: self._conn = clickhouse_driver.dbapi.connect( @@ -98,7 +99,17 @@ def create_dataset(self) -> None: def drop_dataset(self) -> None: # Since ClickHouse doesn't have schemas, we need to drop all tables in our virtual schema, # or collection of tables, that has the `dataset_name` as a prefix. - to_drop_results = self.execute_sql( + to_drop_results = self._list_tables() + for table in to_drop_results: + # The "DROP TABLE" clause is discarded if we allow clickhouse_driver to handle parameter substitution. + # This is because the driver incorrectly substitutes the entire query string, causing the "DROP TABLE" keyword to be omitted. + # To resolve this, we are forced to provide the full query string here. + self.execute_sql( + f"""DROP TABLE {self.capabilities.escape_identifier(self.database_name)}.{self.capabilities.escape_identifier(table)} SYNC""" + ) + + def _list_tables(self) -> List[str]: + rows = self.execute_sql( """ SELECT name FROM system.tables @@ -110,14 +121,7 @@ def drop_dataset(self) -> None: f"{self.dataset_name}%", ), ) - for to_drop_result in to_drop_results: - table = to_drop_result[0] - # The "DROP TABLE" clause is discarded if we allow clickhouse_driver to handle parameter substitution. - # This is because the driver incorrectly substitutes the entire query string, causing the "DROP TABLE" keyword to be omitted. - # To resolve this, we are forced to provide the full query string here. - self.execute_sql( - f"""DROP TABLE {self.capabilities.escape_identifier(self.database_name)}.{self.capabilities.escape_identifier(table)} SYNC""" - ) + return [row[0] for row in rows] @contextmanager @raise_database_error @@ -172,12 +176,14 @@ def make_qualified_table_name(self, table_name: str, escape: bool = True) -> str return f"{database_name}.{dataset_and_table}" @classmethod - def _make_database_exception(cls, ex: Exception) -> Exception: # type: ignore[return] + def _make_database_exception(cls, ex: Exception) -> Exception: if isinstance(ex, clickhouse_driver.dbapi.errors.OperationalError): - if "Code: 57." in str(ex) or "Code: 82." in str(ex): - raise DatabaseTerminalException(ex) + if "Code: 57." in str(ex) or "Code: 82." in str(ex) or "Code: 47." in str(ex): + return DatabaseTerminalException(ex) elif "Code: 60." in str(ex) or "Code: 81." in str(ex): - raise DatabaseUndefinedRelation(ex) + return DatabaseUndefinedRelation(ex) + else: + return DatabaseTransientException(ex) elif isinstance( ex, ( From 871aa4a21dfce689764c81c360b061b944defefa Mon Sep 17 00:00:00 2001 From: Marcin Rudolf Date: Wed, 10 Apr 2024 23:58:15 +0200 Subject: [PATCH 3/4] fixes some tests --- tests/load/pipeline/test_arrow_loading.py | 11 +++++++++-- tests/load/test_sql_client.py | 11 +++++++---- 2 files changed, 16 insertions(+), 6 deletions(-) diff --git a/tests/load/pipeline/test_arrow_loading.py b/tests/load/pipeline/test_arrow_loading.py index 82ccb24bf1..294c479e94 100644 --- a/tests/load/pipeline/test_arrow_loading.py +++ b/tests/load/pipeline/test_arrow_loading.py @@ -1,11 +1,11 @@ from datetime import datetime # noqa: I251 -from typing import Any, Union, List, Dict, Tuple, Literal import os import pytest import numpy as np import pyarrow as pa import pandas as pd +import base64 import dlt from dlt.common import pendulum @@ -42,6 +42,7 @@ def test_load_arrow_item( "redshift", "databricks", "synapse", + "clickhouse", ) # athena/redshift can't load TIME columns include_binary = not ( destination_config.destination in ("redshift", "databricks") @@ -102,11 +103,17 @@ def some_data(): row[i] = row[i].tobytes() if destination_config.destination == "redshift": - # Binary columns are hex formatted in results + # Redshift needs hex string for record in records: if "binary" in record: record["binary"] = record["binary"].hex() + if destination_config.destination == "clickhouse": + # Clickhouse needs base64 string + for record in records: + if "binary" in record: + record["binary"] = base64.b64encode(record["binary"]).decode("ascii") + for row in rows: for i in range(len(row)): if isinstance(row[i], datetime): diff --git a/tests/load/test_sql_client.py b/tests/load/test_sql_client.py index bd1ec5ba43..e9ddddcbe0 100644 --- a/tests/load/test_sql_client.py +++ b/tests/load/test_sql_client.py @@ -369,7 +369,7 @@ def test_database_exceptions(client: SqlJobClientBase) -> None: with client.sql_client.execute_query(f"DELETE FROM {qualified_name} WHERE 1=1"): pass assert client.sql_client.is_dbapi_exception(term_ex.value.dbapi_exception) - if client.config.destination_type != "dremio": + if client.config.destination_type not in ["dremio", "clickhouse"]: with pytest.raises(DatabaseUndefinedRelation) as term_ex: with client.sql_client.execute_query("DROP SCHEMA UNKNOWN"): pass @@ -630,18 +630,21 @@ def assert_load_id(sql_client: SqlClientBase[TNativeConn], load_id: str) -> None def prepare_temp_table(client: SqlJobClientBase) -> str: uniq_suffix = uniq_id() table_name = f"tmp_{uniq_suffix}" - iceberg_table_suffix = "" + ddl_suffix = "" coltype = "numeric" if client.config.destination_type == "athena": - iceberg_table_suffix = ( + ddl_suffix = ( f"LOCATION '{AWS_BUCKET}/ci/{table_name}' TBLPROPERTIES ('table_type'='ICEBERG'," " 'format'='parquet');" ) coltype = "bigint" qualified_table_name = table_name + if client.config.destination_type == "clickhouse": + ddl_suffix = "ENGINE = MergeTree() ORDER BY col" + qualified_table_name = client.sql_client.make_qualified_table_name(table_name) else: qualified_table_name = client.sql_client.make_qualified_table_name(table_name) client.sql_client.execute_sql( - f"CREATE TABLE {qualified_table_name} (col {coltype}) {iceberg_table_suffix};" + f"CREATE TABLE {qualified_table_name} (col {coltype}) {ddl_suffix};" ) return table_name From 8ed4919fdcf4c4f2b7ecb5aa3e34daf5174470e9 Mon Sep 17 00:00:00 2001 From: Marcin Rudolf Date: Thu, 11 Apr 2024 00:21:17 +0200 Subject: [PATCH 4/4] aligns clickhouse config with dataclasses --- .../impl/clickhouse/configuration.py | 23 +++---------------- .../test_clickhouse_table_builder.py | 2 +- 2 files changed, 4 insertions(+), 21 deletions(-) diff --git a/dlt/destinations/impl/clickhouse/configuration.py b/dlt/destinations/impl/clickhouse/configuration.py index 075150538d..34bac5b43b 100644 --- a/dlt/destinations/impl/clickhouse/configuration.py +++ b/dlt/destinations/impl/clickhouse/configuration.py @@ -73,9 +73,9 @@ def to_url(self) -> URL: @configspec class ClickHouseClientConfiguration(DestinationClientDwhWithStagingConfiguration): - destination_type: Final[str] = "clickhouse" # type: ignore[misc] - credentials: ClickHouseCredentials # type: ignore - dataset_name: Final[str] = "" # type: ignore + destination_type: Final[str] = dataclasses.field(default="clickhouse", init=False, repr=False, compare=False) # type: ignore[misc] + credentials: ClickHouseCredentials = None + dataset_name: Final[str] = dataclasses.field(default="", init=False, repr=False, compare=False) # type: ignore[misc] """dataset name in the destination to load data to, for schemas that are not default schema, it is used as dataset prefix""" # Primary key columns are used to build a sparse primary index which allows for efficient data retrieval, @@ -88,20 +88,3 @@ def fingerprint(self) -> str: if self.credentials and self.credentials.host: return digest128(self.credentials.host) return "" - - if TYPE_CHECKING: - - def __init__( - self, - *, - credentials: ClickHouseCredentials = None, - dataset_name: str = None, - destination_name: str = None, - environment: str = None - ) -> None: - super().__init__( - credentials=credentials, - destination_name=destination_name, - environment=environment, - ) - ... diff --git a/tests/load/clickhouse/test_clickhouse_table_builder.py b/tests/load/clickhouse/test_clickhouse_table_builder.py index 9d3fadfc47..9db87dc233 100644 --- a/tests/load/clickhouse/test_clickhouse_table_builder.py +++ b/tests/load/clickhouse/test_clickhouse_table_builder.py @@ -20,7 +20,7 @@ def clickhouse_client(empty_schema: Schema) -> ClickHouseClient: creds = ClickHouseCredentials() return ClickHouseClient( empty_schema, - ClickHouseClientConfiguration(dataset_name=f"test_{uniq_id()}", credentials=creds), + ClickHouseClientConfiguration(credentials=creds)._bind_dataset_name(f"test_{uniq_id()}"), )