Skip to content

Commit

Permalink
Refactor Clickhouse SqlClient wireframing and update capabilities #1055
Browse files Browse the repository at this point in the history
Signed-off-by: Marcel Coetzee <[email protected]>
  • Loading branch information
Pipboyguy committed Mar 12, 2024
1 parent 81091c7 commit 7b7edff
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 21 deletions.
14 changes: 8 additions & 6 deletions dlt/destinations/impl/clickhouse/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@
def capabilities() -> DestinationCapabilitiesContext:
caps = DestinationCapabilitiesContext()

caps.preferred_loader_file_format = "jsonl"
caps.preferred_loader_file_format = "parquet"
caps.supported_loader_file_formats = ["jsonl", "parquet", "insert_values"]
caps.preferred_staging_file_format = "jsonl"
caps.supported_staging_file_formats = ["parquet", "jsonl"]
caps.preferred_staging_file_format = "parquet"
caps.supported_staging_file_formats = ["jsonl", "parquet"]

caps.escape_identifier = escape_clickhouse_identifier
caps.escape_literal = escape_clickhouse_literal
Expand All @@ -22,11 +22,13 @@ def capabilities() -> DestinationCapabilitiesContext:
caps.is_max_query_length_in_bytes = True
caps.max_query_length = 262144

# Clickhouse has limited support for transactional semantics, especially for `ReplicatedMergeTree`,
# the default ClickHouse cloud engine.
# Clickhouse has limited support for transactional semantics, especially for `ReplicatedMergeTree`, the default ClickHouse Cloud engine.
# It does, however, provide atomicity for individual DDL operations like `ALTER TABLE`.
# https://clickhouse-driver.readthedocs.io/en/latest/dbapi.html#clickhouse_driver.dbapi.connection.Connection.commit
# https://clickhouse.com/docs/en/guides/developer/transactional#transactions-commit-and-rollback
caps.supports_ddl_transactions = False
caps.supports_transactions = False
caps.supports_ddl_transactions = (
True # Not as part of a transaction, but single atomic DDL operations are supported.
)

return caps
34 changes: 34 additions & 0 deletions dlt/destinations/impl/clickhouse/clickhouse.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,43 @@
from dlt.destinations.impl.clickhouse.configuration import ClickhouseClientConfiguration
from dlt.destinations.job_client_impl import SqlJobClientWithStaging
from dlt.destinations.sql_client import SqlClientBase
from dlt.destinations.type_mapping import TypeMapper
from dlt.destinations.typing import TNativeConn


class ClickhouseTypeMapper(TypeMapper):
sct_to_unbound_dbt = {
"complex": "JSON",
"text": "String",
"double": "Float64",
"bool": "Boolean",
"date": "Date",
"timestamp": "DateTime",
"bigint": "Int64",
"binary": "String",
"wei": "Decimal",
}

sct_to_dbt = {
"decimal": "Decimal(%i,%i)",
"wei": "Decimal(%i,%i)",
}

dbt_to_sct = {
"String": "text",
"Float64": "double",
"Boolean": "bool",
"Date": "date",
"DateTime": "timestamp",
"Int64": "bigint",
"JSON": "complex",
"Decimal": "decimal",
}

def to_db_time_type(self, precision: Optional[int], table_format: TTableFormat = None) -> str:
return "DateTime"


class ClickhouseClient(SqlJobClientWithStaging, SupportsStagingDestination):
capabilities: ClassVar[DestinationCapabilitiesContext] = capabilities()

Expand Down
54 changes: 39 additions & 15 deletions dlt/destinations/impl/clickhouse/sql_client.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,41 @@
from typing import AnyStr, Any, ContextManager, Optional, Sequence
from contextlib import contextmanager
from typing import (
Iterator,
AnyStr,
Any,
ContextManager,
Optional,
Sequence,
ClassVar,
)

import clickhouse_driver
from clickhouse_driver.dbapi.extras import DictCursor
import clickhouse_driver.dbapi as clickhouse_lib # type: ignore[import-untyped]
from clickhouse_driver.dbapi.connection import Connection # type: ignore[import-untyped]
from clickhouse_driver.dbapi.extras import DictCursor # type: ignore[import-untyped]

from dlt.destinations.sql_client import DBApiCursorImpl, SqlClientBase
from dlt.destinations.typing import DBTransaction, DBApiCursor, TNativeConn
from dlt.common.destination import DestinationCapabilitiesContext
from dlt.common.runtime import logger
from dlt.destinations.impl.clickhouse import capabilities
from dlt.destinations.sql_client import DBApiCursorImpl, SqlClientBase, raise_database_error
from dlt.destinations.typing import DBTransaction, DBApiCursor, TNativeConn, DBApi


class ClickhouseDBApiCursorImpl(DBApiCursorImpl):
native_cursor: DictCursor


class ClickhouseSqlClient(SqlClientBase[clickhouse_driver.Client], DBTransaction):
def open_connection(self) -> TNativeConn:
pass
class ClickhouseSqlClient(SqlClientBase[Connection], DBTransaction):
dbapi: ClassVar[DBApi] = clickhouse_lib
capabilities: ClassVar[DestinationCapabilitiesContext] = capabilities()

def close_connection(self) -> None:
@property
def native_connection(self) -> TNativeConn: # type: ignore
pass

def begin_transaction(self) -> ContextManager[DBTransaction]:
def open_connection(self) -> Connection:
pass

@property
def native_connection(self) -> TNativeConn:
def close_connection(self) -> None:
pass

def execute_sql(
Expand All @@ -42,9 +55,20 @@ def fully_qualified_dataset_name(self, escape: bool = True) -> str:
def _make_database_exception(ex: Exception) -> Exception:
pass

@contextmanager
@raise_database_error
def begin_transaction(self) -> Iterator[DBTransaction]:
logger.warning(
"Clickhouse does not support transactions! Each SQL statement is auto-committed"
" separately."
)
yield self

@raise_database_error
def rollback_transaction(self) -> None:
raise NotImplementedError("You cannot rollback Clickhouse SQL statements.")


class TransactionsNotImplementedError(NotImplementedError):
def __init__(self) -> None:
super().__init__(
"Clickhouse does not support transaction management."
)
super().__init__("Clickhouse does not support transaction management.")

0 comments on commit 7b7edff

Please sign in to comment.