Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/1055-implement-clickhouse-destin…
Browse files Browse the repository at this point in the history
…ation' into 1055-implement-clickhouse-destination
  • Loading branch information
Pipboyguy committed Apr 11, 2024
2 parents 3f20381 + 8ed4919 commit b13942b
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 45 deletions.
14 changes: 11 additions & 3 deletions dlt/destinations/impl/clickhouse/clickhouse.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,12 @@ def __init__(

file_extension = cast(SUPPORTED_FILE_FORMATS, file_extension)
clickhouse_format: str = FILE_FORMAT_TO_TABLE_FUNCTION_MAPPING[file_extension]
compression = "none" if config.get("data_writer.disable_compression") else "gz"
if file_extension == "parquet":
# Auto works for parquet
compression = "auto"
else:
# It does not work for json
compression = "none" if config.get("data_writer.disable_compression") else "gz"

statement: str = ""

Expand All @@ -193,7 +198,6 @@ def __init__(
access_key_id = None
secret_access_key = None

clickhouse_format = FILE_FORMAT_TO_TABLE_FUNCTION_MAPPING[file_extension]
structure = "auto"

template = Template("""
Expand Down Expand Up @@ -234,6 +238,10 @@ def __init__(
statement = f"INSERT INTO {qualified_table_name} {table_function}"
elif not bucket_path:
# Local filesystem.
if file_extension == "parquet":
compression = "auto"
else:
compression = "gz" if FileStorage.is_gzipped(file_path) else "none"
try:
with clickhouse_connect.create_client(
host=client.credentials.host,
Expand All @@ -252,7 +260,7 @@ def __init__(
"allow_experimental_lightweight_delete": 1,
"allow_experimental_object_type": 1,
},
compression=None if compression == "none" else compression,
compression=compression,
)
except clickhouse_connect.driver.exceptions.Error as e:
raise LoadJobTerminalException(
Expand Down
23 changes: 3 additions & 20 deletions dlt/destinations/impl/clickhouse/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,9 @@ def to_url(self) -> URL:

@configspec
class ClickHouseClientConfiguration(DestinationClientDwhWithStagingConfiguration):
destination_type: Final[str] = "clickhouse" # type: ignore[misc]
credentials: ClickHouseCredentials # type: ignore
dataset_name: Final[str] = "" # type: ignore
destination_type: Final[str] = dataclasses.field(default="clickhouse", init=False, repr=False, compare=False) # type: ignore[misc]
credentials: ClickHouseCredentials = None
dataset_name: Final[str] = dataclasses.field(default="", init=False, repr=False, compare=False) # type: ignore[misc]
"""dataset name in the destination to load data to, for schemas that are not default schema, it is used as dataset prefix"""

# Primary key columns are used to build a sparse primary index which allows for efficient data retrieval,
Expand All @@ -88,20 +88,3 @@ def fingerprint(self) -> str:
if self.credentials and self.credentials.host:
return digest128(self.credentials.host)
return ""

if TYPE_CHECKING:

def __init__(
self,
*,
credentials: ClickHouseCredentials = None,
dataset_name: str = None,
destination_name: str = None,
environment: str = None
) -> None:
super().__init__(
credentials=credentials,
destination_name=destination_name,
environment=environment,
)
...
36 changes: 21 additions & 15 deletions dlt/destinations/impl/clickhouse/sql_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
Iterator,
AnyStr,
Any,
List,
Optional,
Sequence,
ClassVar,
Expand Down Expand Up @@ -44,7 +45,7 @@ class ClickHouseDBApiCursorImpl(DBApiCursorImpl):
class ClickHouseSqlClient(
SqlClientBase[clickhouse_driver.dbapi.connection.Connection], DBTransaction
):
dbapi: ClassVar[DBApi] = clickhouse_driver.dbapi.connection.Connection
dbapi: ClassVar[DBApi] = clickhouse_driver.dbapi
capabilities: ClassVar[DestinationCapabilitiesContext] = capabilities()

def __init__(self, dataset_name: str, credentials: ClickHouseCredentials) -> None:
Expand All @@ -54,7 +55,7 @@ def __init__(self, dataset_name: str, credentials: ClickHouseCredentials) -> Non
self.database_name = credentials.database

def has_dataset(self) -> bool:
return super().has_dataset()
return len(self._list_tables()) > 0

def open_connection(self) -> clickhouse_driver.dbapi.connection.Connection:
self._conn = clickhouse_driver.dbapi.connect(
Expand Down Expand Up @@ -98,7 +99,17 @@ def create_dataset(self) -> None:
def drop_dataset(self) -> None:
# Since ClickHouse doesn't have schemas, we need to drop all tables in our virtual schema,
# or collection of tables, that has the `dataset_name` as a prefix.
to_drop_results = self.execute_sql(
to_drop_results = self._list_tables()
for table in to_drop_results:
# The "DROP TABLE" clause is discarded if we allow clickhouse_driver to handle parameter substitution.
# This is because the driver incorrectly substitutes the entire query string, causing the "DROP TABLE" keyword to be omitted.
# To resolve this, we are forced to provide the full query string here.
self.execute_sql(
f"""DROP TABLE {self.capabilities.escape_identifier(self.database_name)}.{self.capabilities.escape_identifier(table)} SYNC"""
)

def _list_tables(self) -> List[str]:
rows = self.execute_sql(
"""
SELECT name
FROM system.tables
Expand All @@ -110,14 +121,7 @@ def drop_dataset(self) -> None:
f"{self.dataset_name}%",
),
)
for to_drop_result in to_drop_results:
table = to_drop_result[0]
# The "DROP TABLE" clause is discarded if we allow clickhouse_driver to handle parameter substitution.
# This is because the driver incorrectly substitutes the entire query string, causing the "DROP TABLE" keyword to be omitted.
# To resolve this, we are forced to provide the full query string here.
self.execute_sql(
f"""DROP TABLE {self.capabilities.escape_identifier(self.database_name)}.{self.capabilities.escape_identifier(table)} SYNC"""
)
return [row[0] for row in rows]

@contextmanager
@raise_database_error
Expand Down Expand Up @@ -172,12 +176,14 @@ def make_qualified_table_name(self, table_name: str, escape: bool = True) -> str
return f"{database_name}.{dataset_and_table}"

@classmethod
def _make_database_exception(cls, ex: Exception) -> Exception: # type: ignore[return]
def _make_database_exception(cls, ex: Exception) -> Exception:
if isinstance(ex, clickhouse_driver.dbapi.errors.OperationalError):
if "Code: 57." in str(ex) or "Code: 82." in str(ex):
raise DatabaseTerminalException(ex)
if "Code: 57." in str(ex) or "Code: 82." in str(ex) or "Code: 47." in str(ex):
return DatabaseTerminalException(ex)
elif "Code: 60." in str(ex) or "Code: 81." in str(ex):
raise DatabaseUndefinedRelation(ex)
return DatabaseUndefinedRelation(ex)
else:
return DatabaseTransientException(ex)
elif isinstance(
ex,
(
Expand Down
2 changes: 1 addition & 1 deletion tests/load/clickhouse/test_clickhouse_table_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ def clickhouse_client(empty_schema: Schema) -> ClickHouseClient:
creds = ClickHouseCredentials()
return ClickHouseClient(
empty_schema,
ClickHouseClientConfiguration(dataset_name=f"test_{uniq_id()}", credentials=creds),
ClickHouseClientConfiguration(credentials=creds)._bind_dataset_name(f"test_{uniq_id()}"),
)


Expand Down
11 changes: 9 additions & 2 deletions tests/load/pipeline/test_arrow_loading.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
from datetime import datetime # noqa: I251
from typing import Any, Union, List, Dict, Tuple, Literal
import os

import pytest
import numpy as np
import pyarrow as pa
import pandas as pd
import base64

import dlt
from dlt.common import pendulum
Expand Down Expand Up @@ -42,6 +42,7 @@ def test_load_arrow_item(
"redshift",
"databricks",
"synapse",
"clickhouse",
) # athena/redshift can't load TIME columns
include_binary = not (
destination_config.destination in ("redshift", "databricks")
Expand Down Expand Up @@ -102,11 +103,17 @@ def some_data():
row[i] = row[i].tobytes()

if destination_config.destination == "redshift":
# Binary columns are hex formatted in results
# Redshift needs hex string
for record in records:
if "binary" in record:
record["binary"] = record["binary"].hex()

if destination_config.destination == "clickhouse":
# Clickhouse needs base64 string
for record in records:
if "binary" in record:
record["binary"] = base64.b64encode(record["binary"]).decode("ascii")

for row in rows:
for i in range(len(row)):
if isinstance(row[i], datetime):
Expand Down
11 changes: 7 additions & 4 deletions tests/load/test_sql_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ def test_database_exceptions(client: SqlJobClientBase) -> None:
with client.sql_client.execute_query(f"DELETE FROM {qualified_name} WHERE 1=1"):
pass
assert client.sql_client.is_dbapi_exception(term_ex.value.dbapi_exception)
if client.config.destination_type != "dremio":
if client.config.destination_type not in ["dremio", "clickhouse"]:
with pytest.raises(DatabaseUndefinedRelation) as term_ex:
with client.sql_client.execute_query("DROP SCHEMA UNKNOWN"):
pass
Expand Down Expand Up @@ -630,18 +630,21 @@ def assert_load_id(sql_client: SqlClientBase[TNativeConn], load_id: str) -> None
def prepare_temp_table(client: SqlJobClientBase) -> str:
uniq_suffix = uniq_id()
table_name = f"tmp_{uniq_suffix}"
iceberg_table_suffix = ""
ddl_suffix = ""
coltype = "numeric"
if client.config.destination_type == "athena":
iceberg_table_suffix = (
ddl_suffix = (
f"LOCATION '{AWS_BUCKET}/ci/{table_name}' TBLPROPERTIES ('table_type'='ICEBERG',"
" 'format'='parquet');"
)
coltype = "bigint"
qualified_table_name = table_name
if client.config.destination_type == "clickhouse":
ddl_suffix = "ENGINE = MergeTree() ORDER BY col"
qualified_table_name = client.sql_client.make_qualified_table_name(table_name)
else:
qualified_table_name = client.sql_client.make_qualified_table_name(table_name)
client.sql_client.execute_sql(
f"CREATE TABLE {qualified_table_name} (col {coltype}) {iceberg_table_suffix};"
f"CREATE TABLE {qualified_table_name} (col {coltype}) {ddl_suffix};"
)
return table_name

0 comments on commit b13942b

Please sign in to comment.