Skip to content

Commit

Permalink
Add clickhouse connect as local fallback #1055
Browse files Browse the repository at this point in the history
Signed-off-by: Marcel Coetzee <[email protected]>
  • Loading branch information
Pipboyguy committed Apr 5, 2024
1 parent 8215f44 commit 881a0b9
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 25 deletions.
40 changes: 23 additions & 17 deletions dlt/destinations/impl/clickhouse/clickhouse.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,30 +244,36 @@ def __init__(
statement = f"INSERT INTO {qualified_table_name} {table_function}"
elif not bucket_path:
# Local filesystem.
with clickhouse_connect.get_client(
host=client.credentials.host,
port=client.credentials.port,
database=client.credentials.database,
user_name=client.credentials.username,
password=client.credentials.password,
secure=bool(client.credentials.secure),
) as clickhouse_connect_client:
insert_file(
clickhouse_connect_client,
qualified_table_name,
try:
with clickhouse_connect.create_client(
host=client.credentials.host,
port=client.credentials.http_port,
database=client.credentials.database,
user_name=client.credentials.username,
password=client.credentials.password,
secure=bool(client.credentials.secure),
) as clickhouse_connect_client:
insert_file(
clickhouse_connect_client,
qualified_table_name,
file_path,
fmt=clickhouse_format,
)
except clickhouse_connect.driver.exceptions.Error as e:
raise LoadJobTerminalException(
file_path,
fmt=clickhouse_format,
database=client.database_name,
)
statement = ""
f"Clickhouse connection failed due to {e}.",
) from e
else:
raise LoadJobTerminalException(
file_path,
f"Clickhouse loader does not support '{bucket_scheme}' filesystem.",
)

with client.begin_transaction():
client.execute_sql(statement)
# Don't use dbapi driver for local files.
if bucket_path:
with client.begin_transaction():
client.execute_sql(statement)

def state(self) -> TLoadJobState:
return "completed"
Expand Down
9 changes: 7 additions & 2 deletions dlt/destinations/impl/clickhouse/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,26 @@ class ClickhouseCredentials(ConnectionStringCredentials):
host: str # type: ignore
"""Host with running ClickHouse server."""
port: int = 9440
"""Port ClickHouse server is bound to. Defaults to 9000."""
"""Native port ClickHouse server is bound to. Defaults to 9440."""
http_port: int = 8443
"""HTTP Port to connect to ClickHouse server's HTTP interface.
The HTTP port is needed for non-staging pipelines.
Defaults to 8123."""
username: str = "default"
"""Database user. Defaults to 'default'."""
database: str = "default"
"""database connect to. Defaults to 'default'."""
secure: TSecureConnection = 1
"""Enables TLS encryption when connecting to ClickHouse Server. 0 means no encryption, 1 means encrypted."""
connect_timeout: int = 10
connect_timeout: int = 15
"""Timeout for establishing connection. Defaults to 10 seconds."""
send_receive_timeout: int = 300
"""Timeout for sending and receiving data. Defaults to 300 seconds."""

__config_gen_annotations__: ClassVar[List[str]] = [
"host",
"port",
"http_port",
"username",
"database",
"secure",
Expand Down
8 changes: 2 additions & 6 deletions dlt/destinations/impl/clickhouse/sql_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,9 +149,7 @@ def execute_query(
def fully_qualified_dataset_name(self, escape: bool = True) -> str:
if escape:
database_name = self.capabilities.escape_identifier(self.database_name)
dataset_name = self.capabilities.escape_identifier(
f"{self.dataset_name}"
)
dataset_name = self.capabilities.escape_identifier(f"{self.dataset_name}")
else:
database_name = self.database_name
dataset_name = f"{self.dataset_name}"
Expand All @@ -168,9 +166,7 @@ def make_qualified_table_name(self, table_name: str, escape: bool = True) -> str
)
else:
database_name = self.database_name
dataset_and_table = (
f"{self.dataset_name}{dataset_table_separator}{table_name}"
)
dataset_and_table = f"{self.dataset_name}{dataset_table_separator}{table_name}"
return f"{database_name}.{dataset_and_table}"

@classmethod
Expand Down

0 comments on commit 881a0b9

Please sign in to comment.