Skip to content

Commit

Permalink
Update preliminary Clickhouse configurations #1055
Browse files Browse the repository at this point in the history
Signed-off-by: Marcel Coetzee <[email protected]>
  • Loading branch information
Pipboyguy committed Mar 11, 2024
1 parent d736dee commit 0407ab8
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 28 deletions.
10 changes: 6 additions & 4 deletions dlt/common/configuration/specs/connection_string_credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@ def parse_native_representation(self, native_value: Any) -> None:
raise InvalidConnectionString(self.__class__, native_value, self.drivername)
try:
url = make_url(native_value)
# update only values that are not None
# Update only values that are not None.
self.update({k: v for k, v in url._asdict().items() if v is not None})
if self.query is not None:
# query may be immutable so make it mutable
# Query may be immutable so make it mutable.
self.query = dict(self.query)
except Exception:
raise InvalidConnectionString(self.__class__, native_value, self.drivername)
except Exception as e:
raise InvalidConnectionString(
self.__class__, native_value, self.drivername
) from e

def on_resolved(self) -> None:
if self.password:
Expand Down
4 changes: 2 additions & 2 deletions dlt/common/destination/reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,10 +179,10 @@ def __init__(

@configspec
class DestinationClientDwhWithStagingConfiguration(DestinationClientDwhConfiguration):
"""Configuration of a destination that can take data from staging destination"""
"""Configuration of a destination that can take data from a staging destination."""

staging_config: Optional[DestinationClientStagingConfiguration] = None
"""configuration of the staging, if present, injected at runtime"""
"""Configuration of the staging, if present, injected at runtime."""
if TYPE_CHECKING:

def __init__(
Expand Down
18 changes: 10 additions & 8 deletions dlt/destinations/impl/clickhouse/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,22 @@

def capabilities() -> DestinationCapabilitiesContext:
caps = DestinationCapabilitiesContext()

caps.preferred_loader_file_format = "jsonl"
caps.supported_loader_file_formats = ["jsonl", "parquet"]
caps.supported_loader_file_formats = ["jsonl", "parquet", "insert_values"]
caps.preferred_staging_file_format = "parquet"
caps.supported_staging_file_formats = ["parquet", "jsonl"]

caps.escape_identifier = escape_clickhouse_identifier
caps.escape_literal = escape_clickhouse_literal

caps.decimal_precision = (DEFAULT_NUMERIC_PRECISION, DEFAULT_NUMERIC_SCALE)
caps.wei_precision = (76, 38)
caps.max_identifier_length = 1024
caps.max_column_identifier_length = 300
caps.max_query_length = 1024 * 1024
caps.is_max_query_length_in_bytes = False
caps.max_text_data_type_length = 10 * 1024 * 1024
caps.is_max_text_data_type_length_in_bytes = True

# Clickhouse has limited support for transactional semantics, especially for `ReplicatedMergeTree`,
# the default ClickHouse cloud engine.
# https://clickhouse-driver.readthedocs.io/en/latest/dbapi.html#clickhouse_driver.dbapi.connection.Connection.commit
# https://clickhouse.com/docs/en/guides/developer/transactional#transactions-commit-and-rollback
caps.supports_ddl_transactions = False
caps.supports_transactions = False

return caps
77 changes: 63 additions & 14 deletions dlt/destinations/impl/clickhouse/configuration.py
Original file line number Diff line number Diff line change
@@ -1,39 +1,88 @@
from typing import TYPE_CHECKING, ClassVar, List, Optional, Final
from typing import ClassVar, List, Any, Final, TYPE_CHECKING

from dlt.common.configuration import configspec
from dlt.common.destination.reference import DestinationClientDwhWithStagingConfiguration
from dlt.common.configuration.specs import ConnectionStringCredentials
from dlt.common.destination.reference import (
DestinationClientDwhWithStagingConfiguration,
)
from dlt.common.libs.sql_alchemy import URL
from dlt.common.utils import digest128


@configspec
class ClickhouseCredentials(ConnectionStringCredentials):
drivername: str = "clickhouse"
host: str
"""Host with running ClickHouse server."""
port: int = 9000
"""Port ClickHouse server is bound to. Defaults to 9000."""
user: str = "default"
"""Database user. Defaults to 'default'."""
database: str = "default"
"""database connect to. Defaults to 'default'."""
connect_timeout: int = 10
"""Timeout for establishing connection. Defaults to 10 seconds."""
send_receive_timeout: int = 300
"""Timeout for sending and receiving data. Defaults to 300 seconds."""

__config_gen_annotations__: ClassVar[List[str]] = [
"host",
"port",
"user",
"database",
"connect_timeout",
"send_receive_timeout",
]

def parse_native_representation(self, native_value: Any) -> None:
super().parse_native_representation(native_value)
self.connect_timeout = int(self.query.get("connect_timeout", self.connect_timeout))
self.send_receive_timeout = int(
self.query.get("send_receive_timeout", self.send_receive_timeout)
)
if not self.is_partial():
self.resolve()

def to_url(self) -> URL:
url = super().to_url()
url.update_query_pairs(
[
("connect_timeout", str(self.connect_timeout)),
("send_receive_timeout", str(self.send_receive_timeout)),
]
)
return url


@configspec
class ClickhouseClientConfiguration(DestinationClientDwhWithStagingConfiguration):
destination_type: Final[str] = "clickhouse" # type: ignore
destination_type: Final[str] = "clickhouse" # type: ignore[misc]
credentials: ClickhouseCredentials

http_timeout: float = 15.0
file_upload_timeout: float = 30 * 60.0
retry_deadline: float = 60.0
create_indexes: bool = True

__config_gen_annotations__: ClassVar[List[str]] = []
def fingerprint(self) -> str:
"""Returns a fingerprint of host part of a connection string."""
if self.credentials and self.credentials.host:
return digest128(self.credentials.host)
return ""

if TYPE_CHECKING:

def __init__(
self,
*,
credentials: ClickhouseCredentials = None,
dataset_name: str = None,
default_schema_name: Optional[str],
http_timeout: float = 15.0,
file_upload_timeout: float = 30 * 60.0,
retry_deadline: float = 60.0,
default_schema_name: str = None,
destination_name: str = None,
environment: str = None
) -> None:
super().__init__(
credentials=credentials,
dataset_name=dataset_name,
default_schema_name=default_schema_name,
destination_name=destination_name,
environment=environment,
)
self.retry_deadline = retry_deadline
self.file_upload_timeout = file_upload_timeout
self.http_timeout = http_timeout
...

0 comments on commit 0407ab8

Please sign in to comment.